173: Use libp2p-async-await to improve API of execution setup phase r=D4nte a=D4nte



Co-authored-by: Franck Royer <franck@coblox.tech>
This commit is contained in:
bors[bot] 2021-02-07 22:17:11 +00:00 committed by GitHub
commit ec661178f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 697 additions and 1584 deletions

10
Cargo.lock generated
View File

@ -1637,6 +1637,15 @@ dependencies = [
"wasm-timer", "wasm-timer",
] ]
[[package]]
name = "libp2p-async-await"
version = "0.1.0"
source = "git+https://github.com/comit-network/rust-libp2p-async-await?rev=1429cd780204624b4d244e7d8179fe6ff77988c3#1429cd780204624b4d244e7d8179fe6ff77988c3"
dependencies = [
"libp2p",
"log",
]
[[package]] [[package]]
name = "libp2p-core" name = "libp2p-core"
version = "0.27.0" version = "0.27.0"
@ -3415,6 +3424,7 @@ dependencies = [
"get-port", "get-port",
"hyper", "hyper",
"libp2p", "libp2p",
"libp2p-async-await",
"log", "log",
"miniscript", "miniscript",
"monero", "monero",

View File

@ -20,22 +20,22 @@ end
group Execution Setup group Execution Setup
group Phase A [Messages can be exchanged in any order] group Phase A [Messages can be exchanged in any order]
Bob -> Alice: bob::Message0 Bob -> Alice: Message0
note left: Pubkeys\ndleq proof s_b\nxmr viewkey v_b\nbtc refund addr note left: Pubkeys\ndleq proof s_b\nxmr viewkey v_b\nbtc refund addr
Alice -> Bob: alice::Message0 Alice -> Bob: Message1
note right: Pubkeys\ndleq proof s_a\nxmr view key v_a\nbtc redeem addr\nbtc punish addr note right: Pubkeys\ndleq proof s_a\nxmr view key v_a\nbtc redeem addr\nbtc punish addr
end end
group Phase B [Messages must be exchanged in the given order] group Phase B [Messages must be exchanged in the given order]
Bob -> Alice: Message1 Bob -> Alice: Message2
note left: unsigned btc lock tx note left: unsigned btc lock tx
Alice -> Bob: Message2 Alice -> Bob: Message3
note right: btc cancel tx sig\nbtc refund tx enc sig S_b note right: btc cancel tx sig\nbtc refund tx enc sig S_b
Bob -> Alice: Message3 Bob -> Alice: Message4
note left: btc punish tx sig\nbtc cancel tx sig note left: btc punish tx sig\nbtc cancel tx sig
end end

View File

@ -1,7 +1,7 @@
edition = "2018" edition = "2018"
condense_wildcard_suffixes = true condense_wildcard_suffixes = true
format_macro_matchers = true format_macro_matchers = true
merge_imports = true imports_granularity = "Crate"
use_field_init_shorthand = true use_field_init_shorthand = true
format_code_in_doc_comments = true format_code_in_doc_comments = true
normalize_comments = true normalize_comments = true

View File

@ -303,14 +303,11 @@ impl IntoIterator for Args {
type IntoIter = ::std::vec::IntoIter<String>; type IntoIter = ::std::vec::IntoIter<String>;
fn into_iter(self) -> <Self as IntoIterator>::IntoIter { fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
let mut args = Vec::new(); vec![
"/bin/bash".to_string(),
args.push("/bin/bash".into()); "-c".to_string(),
args.push("-c".into()); format!("{} ", self.image_args.args()),
]
let cmd = format!("{} ", self.image_args.args()); .into_iter()
args.push(cmd);
args.into_iter()
} }
} }

View File

@ -1 +1 @@
nightly-2020-08-13 nightly-2021-01-31

View File

@ -1,7 +1,7 @@
edition = "2018" edition = "2018"
condense_wildcard_suffixes = true condense_wildcard_suffixes = true
format_macro_matchers = true format_macro_matchers = true
merge_imports = true imports_granularity = "Crate"
use_field_init_shorthand = true use_field_init_shorthand = true
format_code_in_doc_comments = true format_code_in_doc_comments = true
normalize_comments = true normalize_comments = true

View File

@ -25,6 +25,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045
ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3 ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] }
libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await", rev = "1429cd780204624b4d244e7d8179fe6ff77988c3" }
log = { version = "0.4", features = ["serde"] } log = { version = "0.4", features = ["serde"] }
miniscript = { version = "4", features = ["serde"] } miniscript = { version = "4", features = ["serde"] }
monero = { version = "0.9", features = ["serde_support"] } monero = { version = "0.9", features = ["serde_support"] }

View File

@ -157,8 +157,7 @@ impl TransactionBlockHeight for Wallet {
.await .await
.map_err(|_| backoff::Error::Transient(Error::Io))?; .map_err(|_| backoff::Error::Transient(Error::Io))?;
let block_height = let block_height = block_height.ok_or(backoff::Error::Transient(Error::NotYetMined))?;
block_height.ok_or_else(|| backoff::Error::Transient(Error::NotYetMined))?;
Result::<_, backoff::Error<Error>>::Ok(block_height) Result::<_, backoff::Error<Error>>::Ok(block_height)
}) })

View File

@ -17,7 +17,7 @@ impl Seed {
Ok(Seed(seed::Seed::random()?)) Ok(Seed(seed::Seed::random()?))
} }
pub fn from_file_or_generate(data_dir: &PathBuf) -> Result<Self, Error> { pub fn from_file_or_generate(data_dir: &Path) -> Result<Self, Error> {
let file_path_buf = data_dir.join("seed.pem"); let file_path_buf = data_dir.join("seed.pem");
let file_path = Path::new(&file_path_buf); let file_path = Path::new(&file_path_buf);

View File

@ -1,67 +1,21 @@
use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature};
use async_trait::async_trait; use async_trait::async_trait;
use futures::prelude::*; use futures::prelude::*;
use libp2p::{ use libp2p::{
core::{upgrade, upgrade::ReadOneError}, core::{upgrade, upgrade::ReadOneError},
request_response::{ProtocolName, RequestResponseCodec}, request_response::{ProtocolName, RequestResponseCodec},
}; };
use serde::{Deserialize, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Debug, io, marker::PhantomData}; use std::{fmt::Debug, io, marker::PhantomData};
/// Time to wait for a response back once we send a request. /// Time to wait for a response back once we send a request.
pub const TIMEOUT: u64 = 3600; // One hour. pub const TIMEOUT: u64 = 3600; // One hour.
/// Message receive buffer. /// Message receive buffer.
const BUF_SIZE: usize = 1024 * 1024; pub const BUF_SIZE: usize = 1024 * 1024;
// TODO: Think about whether there is a better way to do this, e.g., separate
// Codec for each Message and a macro that implements them.
/// Messages Bob sends to Alice.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BobToAlice {
SwapRequest(Box<bob::SwapRequest>),
Message0(Box<bob::Message0>),
Message1(Box<bob::Message1>),
Message2(Box<bob::Message2>),
}
/// Messages Alice sends to Bob.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AliceToBob {
SwapResponse(Box<alice::SwapResponse>),
Message0(Box<alice::Message0>),
Message1(Box<alice::Message1>),
Message2,
}
/// Messages sent from one party to the other.
/// All responses are empty
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Request {
TransferProof(Box<TransferProof>),
EncryptedSignature(Box<EncryptedSignature>),
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
/// Response are only used for acknowledgement purposes.
pub enum Response {
TransferProof,
EncryptedSignature,
}
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct Swap; pub struct Swap;
#[derive(Debug, Clone, Copy, Default)]
pub struct Message0Protocol;
#[derive(Debug, Clone, Copy, Default)]
pub struct Message1Protocol;
#[derive(Debug, Clone, Copy, Default)]
pub struct Message2Protocol;
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct TransferProofProtocol; pub struct TransferProofProtocol;
@ -70,139 +24,45 @@ pub struct EncryptedSignatureProtocol;
impl ProtocolName for Swap { impl ProtocolName for Swap {
fn protocol_name(&self) -> &[u8] { fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/swap/1.0.0" b"/comit/xmr/btc/swap/1.0.0"
}
}
impl ProtocolName for Message0Protocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/message0/1.0.0"
}
}
impl ProtocolName for Message1Protocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/message1/1.0.0"
}
}
impl ProtocolName for Message2Protocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/message2/1.0.0"
} }
} }
impl ProtocolName for TransferProofProtocol { impl ProtocolName for TransferProofProtocol {
fn protocol_name(&self) -> &[u8] { fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/transfer_proof/1.0.0" b"/comit/xmr/btc/transfer_proof/1.0.0"
} }
} }
impl ProtocolName for EncryptedSignatureProtocol { impl ProtocolName for EncryptedSignatureProtocol {
fn protocol_name(&self) -> &[u8] { fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/encrypted_signature/1.0.0" b"/comit/xmr/btc/encrypted_signature/1.0.0"
} }
} }
#[derive(Clone, Copy, Debug, Default)] #[derive(Clone, Copy, Debug)]
pub struct Codec<P> { pub struct CborCodec<P, Req, Res> {
phantom: PhantomData<P>, phantom: PhantomData<(P, Req, Res)>,
}
impl<P, Req, Res> Default for CborCodec<P, Req, Res> {
fn default() -> Self {
Self {
phantom: PhantomData::default(),
}
}
} }
#[async_trait] #[async_trait]
impl<P> RequestResponseCodec for Codec<P> impl<P, Req, Res> RequestResponseCodec for CborCodec<P, Req, Res>
where where
P: Send + Sync + Clone + ProtocolName, P: ProtocolName + Send + Sync + Clone,
Req: DeserializeOwned + Serialize + Send,
Res: DeserializeOwned + Serialize + Send,
{ {
type Protocol = P; type Protocol = P;
type Request = BobToAlice; type Request = Req;
type Response = AliceToBob; type Response = Res;
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = BobToAlice::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_request error: {:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})?;
Ok(msg)
}
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = AliceToBob::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
Ok(msg)
}
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes =
serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_reponse error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct OneShotCodec<P> {
phantom: PhantomData<P>,
}
#[async_trait]
impl<P> RequestResponseCodec for OneShotCodec<P>
where
P: Send + Sync + Clone + ProtocolName,
{
type Protocol = P;
type Request = Request;
type Response = Response;
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request> async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where where
@ -213,7 +73,7 @@ where
e => io::Error::new(io::ErrorKind::Other, e), e => io::Error::new(io::ErrorKind::Other, e),
})?; })?;
let mut de = serde_cbor::Deserializer::from_slice(&message); let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Request::deserialize(&mut de).map_err(|e| { let msg = Req::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_request error: {:?}", e); tracing::debug!("serde read_request error: {:?}", e);
io::Error::new(io::ErrorKind::Other, e) io::Error::new(io::ErrorKind::Other, e)
})?; })?;
@ -233,7 +93,7 @@ where
.await .await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message); let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Response::deserialize(&mut de).map_err(|e| { let msg = Res::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e); tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e) io::Error::new(io::ErrorKind::InvalidData, e)
})?; })?;
@ -268,7 +128,7 @@ where
T: AsyncWrite + Unpin + Send, T: AsyncWrite + Unpin + Send,
{ {
let bytes = serde_cbor::to_vec(&res).map_err(|e| { let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_reponse error: {:?}", e); tracing::debug!("serde write_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e) io::Error::new(io::ErrorKind::InvalidData, e)
})?; })?;
upgrade::write_one(io, &bytes).await?; upgrade::write_one(io, &bytes).await?;

View File

@ -1,14 +1,5 @@
//! Run an XMR/BTC swap in the role of Alice. //! Run an XMR/BTC swap in the role of Alice.
//! Alice holds XMR and wishes receive BTC. //! Alice holds XMR and wishes receive BTC.
pub use self::{
event_loop::{EventLoop, EventLoopHandle},
message0::Message0,
message1::Message1,
state::*,
swap::{run, run_until},
swap_response::*,
transfer_proof::TransferProof,
};
use crate::{ use crate::{
bitcoin, database, bitcoin, database,
database::Database, database::Database,
@ -16,14 +7,13 @@ use crate::{
monero, monero,
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
request_response::AliceToBob,
transport::build, transport::build,
Seed as NetworkSeed, Seed as NetworkSeed,
}, },
protocol::{bob, bob::EncryptedSignature, SwapAmounts}, protocol::{bob::EncryptedSignature, SwapAmounts},
seed::Seed, seed::Seed,
}; };
use anyhow::{bail, Result}; use anyhow::{bail, Error, Result};
use libp2p::{ use libp2p::{
core::Multiaddr, identity::Keypair, request_response::ResponseChannel, NetworkBehaviour, PeerId, core::Multiaddr, identity::Keypair, request_response::ResponseChannel, NetworkBehaviour, PeerId,
}; };
@ -32,11 +22,20 @@ use std::{path::PathBuf, sync::Arc};
use tracing::{debug, info}; use tracing::{debug, info};
use uuid::Uuid; use uuid::Uuid;
pub use self::{
event_loop::{EventLoop, EventLoopHandle},
execution_setup::Message1,
state::*,
swap::{run, run_until},
swap_response::*,
transfer_proof::TransferProof,
};
use crate::protocol::bob::SwapRequest;
pub use execution_setup::Message3;
mod encrypted_signature; mod encrypted_signature;
pub mod event_loop; pub mod event_loop;
mod message0; mod execution_setup;
mod message1;
mod message2;
pub mod state; pub mod state;
mod steps; mod steps;
pub mod swap; pub mod swap;
@ -198,6 +197,7 @@ impl Builder {
self.execution_params.bitcoin_punish_timelock, self.execution_params.bitcoin_punish_timelock,
redeem_address, redeem_address,
punish_address, punish_address,
rng,
); );
Ok(AliceState::Started { amounts, state0 }) Ok(AliceState::Started { amounts, state0 })
@ -218,25 +218,18 @@ impl Builder {
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
// TODO (Franck): Change this to get both amounts so parties can verify the amounts are SwapRequest {
// expected early on. msg: SwapRequest,
Request(Box<swap_response::OutEvent>), /* Not-uniform with Bob on purpose, ready for adding channel: ResponseChannel<SwapResponse>,
* Xmr
* event. */
Message0 {
msg: Box<bob::Message0>,
channel: ResponseChannel<AliceToBob>,
},
Message1 {
msg: bob::Message1,
channel: ResponseChannel<AliceToBob>,
},
Message2 {
msg: Box<bob::Message2>,
bob_peer_id: PeerId,
}, },
ExecutionSetupDone(Result<Box<State3>>),
TransferProofAcknowledged, TransferProofAcknowledged,
EncryptedSignature(EncryptedSignature), EncryptedSignature {
msg: Box<EncryptedSignature>,
channel: ResponseChannel<()>,
},
ResponseSent, // Same variant is used for all messages as no processing is done
Failure(Error),
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -251,52 +244,43 @@ impl From<peer_tracker::OutEvent> for OutEvent {
impl From<swap_response::OutEvent> for OutEvent { impl From<swap_response::OutEvent> for OutEvent {
fn from(event: swap_response::OutEvent) -> Self { fn from(event: swap_response::OutEvent) -> Self {
OutEvent::Request(Box::new(event)) use swap_response::OutEvent::*;
}
}
impl From<message0::OutEvent> for OutEvent {
fn from(event: message0::OutEvent) -> Self {
match event { match event {
message0::OutEvent::Msg { channel, msg } => OutEvent::Message0 { MsgReceived { msg, channel } => OutEvent::SwapRequest { msg, channel },
msg: Box::new(msg), ResponseSent => OutEvent::ResponseSent,
channel, Failure(err) => OutEvent::Failure(err.context("Swap Request/Response failure")),
},
} }
} }
} }
impl From<message1::OutEvent> for OutEvent { impl From<execution_setup::OutEvent> for OutEvent {
fn from(event: message1::OutEvent) -> Self { fn from(event: execution_setup::OutEvent) -> Self {
match event { match event {
message1::OutEvent::Msg { msg, channel } => OutEvent::Message1 { msg, channel }, execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)),
}
}
}
impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self {
match event {
message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 {
msg: Box::new(msg),
bob_peer_id,
},
} }
} }
} }
impl From<transfer_proof::OutEvent> for OutEvent { impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self { fn from(event: transfer_proof::OutEvent) -> Self {
use transfer_proof::OutEvent::*;
match event { match event {
transfer_proof::OutEvent::Acknowledged => OutEvent::TransferProofAcknowledged, Acknowledged => OutEvent::TransferProofAcknowledged,
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
} }
} }
} }
impl From<encrypted_signature::OutEvent> for OutEvent { impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self { fn from(event: encrypted_signature::OutEvent) -> Self {
use encrypted_signature::OutEvent::*;
match event { match event {
encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(msg), MsgReceived { msg, channel } => OutEvent::EncryptedSignature {
msg: Box::new(msg),
channel,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
} }
} }
} }
@ -308,9 +292,7 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
pub struct Behaviour { pub struct Behaviour {
pt: PeerTracker, pt: PeerTracker,
amounts: swap_response::Behaviour, amounts: swap_response::Behaviour,
message0: message0::Behaviour, execution_setup: execution_setup::Behaviour,
message1: message1::Behaviour,
message2: message2::Behaviour,
transfer_proof: transfer_proof::Behaviour, transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour, encrypted_signature: encrypted_signature::Behaviour,
} }
@ -319,7 +301,7 @@ impl Behaviour {
/// Alice always sends her messages as a response to a request from Bob. /// Alice always sends her messages as a response to a request from Bob.
pub fn send_swap_response( pub fn send_swap_response(
&mut self, &mut self,
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<SwapResponse>,
swap_response: SwapResponse, swap_response: SwapResponse,
) -> Result<()> { ) -> Result<()> {
self.amounts.send(channel, swap_response)?; self.amounts.send(channel, swap_response)?;
@ -327,26 +309,9 @@ impl Behaviour {
Ok(()) Ok(())
} }
/// Send Message0 to Bob in response to receiving his Message0. pub fn start_execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) {
pub fn send_message0( self.execution_setup.run(bob_peer_id, state0);
&mut self, info!("Start execution setup with {}", bob_peer_id);
channel: ResponseChannel<AliceToBob>,
msg: Message0,
) -> Result<()> {
self.message0.send(channel, msg)?;
debug!("Sent Message0");
Ok(())
}
/// Send Message1 to Bob in response to receiving his Message1.
pub fn send_message1(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: Message1,
) -> Result<()> {
self.message1.send(channel, msg)?;
debug!("Sent Message1");
Ok(())
} }
/// Send Transfer Proof to Bob. /// Send Transfer Proof to Bob.

View File

@ -1,53 +1,42 @@
use crate::{ use crate::{
network::request_response::{ network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT},
EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT,
},
protocol::bob::EncryptedSignature, protocol::bob::EncryptedSignature,
}; };
use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage, ResponseChannel,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, NetworkBehaviour,
}; };
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Msg(EncryptedSignature), MsgReceived {
msg: EncryptedSignature,
channel: ResponseChannel<()>,
},
AckSent,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted /// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
/// signature from Bob. /// signature from Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<OneShotCodec<EncryptedSignatureProtocol>>, rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
fn poll( pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
&mut self, self.rr
_: &mut Context<'_>, .send_response(channel, ())
_: &mut impl PollParameters, .map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err))
) -> Poll<
NetworkBehaviourAction<RequestProtocol<OneShotCodec<EncryptedSignatureProtocol>>, OutEvent>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
} }
} }
@ -59,48 +48,42 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
OneShotCodec::default(), CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour { impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) { fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: message:
RequestResponseMessage::Request { RequestResponseMessage::Request {
request, channel, .. request, channel, ..
}, },
.. ..
} => { } => {
if let Request::EncryptedSignature(msg) = request { debug!("Received encrypted signature from {}", peer);
debug!("Received encrypted signature"); OutEvent::MsgReceived {
self.events.push_back(OutEvent::Msg(*msg)); msg: request,
// Send back empty response so that the request/response protocol completes. channel,
if let Err(error) = self.rr.send_response(channel, Response::EncryptedSignature)
{
error!("Failed to send Encrypted Signature ack: {:?}", error);
}
} }
} }
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => panic!("Alice should not get a Response"), } => OutEvent::Failure(anyhow!("Alice should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message3 response to Bob");
} }
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
} }
} }
} }

View File

@ -1,10 +1,8 @@
use crate::{ use crate::{
network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
protocol::{ protocol::{
alice, alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof},
alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, bob::{EncryptedSignature, SwapRequest},
bob,
bob::EncryptedSignature,
}, },
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
@ -35,15 +33,12 @@ impl<T> Default for Channels<T> {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_message0: Receiver<(bob::Message0, ResponseChannel<AliceToBob>)>, done_execution_setup: Receiver<Result<State3>>,
recv_message1: Receiver<(bob::Message1, ResponseChannel<AliceToBob>)>,
recv_message2: Receiver<bob::Message2>,
recv_encrypted_signature: Receiver<EncryptedSignature>, recv_encrypted_signature: Receiver<EncryptedSignature>,
request: Receiver<crate::protocol::alice::swap_response::OutEvent>, recv_swap_request: Receiver<(SwapRequest, ResponseChannel<SwapResponse>)>,
conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
send_swap_response: Sender<(ResponseChannel<AliceToBob>, SwapResponse)>, send_swap_response: Sender<(ResponseChannel<SwapResponse>, SwapResponse)>,
send_message0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>, start_execution_setup: Sender<(PeerId, State0)>,
send_message1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Sender<(PeerId, TransferProof)>, send_transfer_proof: Sender<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Receiver<()>, recv_transfer_proof_ack: Receiver<()>,
} }
@ -56,25 +51,16 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to receive connection established from Bob")) .ok_or_else(|| anyhow!("Failed to receive connection established from Bob"))
} }
pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel<AliceToBob>)> { pub async fn execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) -> Result<State3> {
self.recv_message0 let _ = self
.recv() .start_execution_setup
.await .send((bob_peer_id, state0))
.ok_or_else(|| anyhow!("Failed to receive message 0 from Bob")) .await?;
}
pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel<AliceToBob>)> { self.done_execution_setup
self.recv_message1
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) .ok_or_else(|| anyhow!("Failed to setup execution with Bob"))?
}
pub async fn recv_message2(&mut self) -> Result<bob::Message2> {
self.recv_message2
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 2 from Bob"))
} }
pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> { pub async fn recv_encrypted_signature(&mut self) -> Result<EncryptedSignature> {
@ -84,10 +70,10 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob"))
} }
pub async fn recv_request( pub async fn recv_swap_request(
&mut self, &mut self,
) -> Result<crate::protocol::alice::swap_response::OutEvent> { ) -> Result<(SwapRequest, ResponseChannel<SwapResponse>)> {
self.request self.recv_swap_request
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed to receive amounts request from Bob")) .ok_or_else(|| anyhow!("Failed to receive amounts request from Bob"))
@ -95,7 +81,7 @@ impl EventLoopHandle {
pub async fn send_swap_response( pub async fn send_swap_response(
&mut self, &mut self,
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<SwapResponse>,
swap_response: SwapResponse, swap_response: SwapResponse,
) -> Result<()> { ) -> Result<()> {
let _ = self let _ = self
@ -105,24 +91,6 @@ impl EventLoopHandle {
Ok(()) Ok(())
} }
pub async fn send_message0(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: alice::Message0,
) -> Result<()> {
let _ = self.send_message0.send((channel, msg)).await?;
Ok(())
}
pub async fn send_message1(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: alice::Message1,
) -> Result<()> {
let _ = self.send_message1.send((channel, msg)).await?;
Ok(())
}
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?; let _ = self.send_transfer_proof.send((bob, msg)).await?;
@ -137,15 +105,12 @@ impl EventLoopHandle {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
recv_message0: Sender<(bob::Message0, ResponseChannel<AliceToBob>)>, start_execution_setup: Receiver<(PeerId, State0)>,
recv_message1: Sender<(bob::Message1, ResponseChannel<AliceToBob>)>, done_execution_setup: Sender<Result<State3>>,
recv_message2: Sender<bob::Message2>,
recv_encrypted_signature: Sender<EncryptedSignature>, recv_encrypted_signature: Sender<EncryptedSignature>,
request: Sender<crate::protocol::alice::swap_response::OutEvent>, recv_swap_request: Sender<(SwapRequest, ResponseChannel<SwapResponse>)>,
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
send_swap_response: Receiver<(ResponseChannel<AliceToBob>, SwapResponse)>, send_swap_response: Receiver<(ResponseChannel<SwapResponse>, SwapResponse)>,
send_message0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_message1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Receiver<(PeerId, TransferProof)>, send_transfer_proof: Receiver<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Sender<()>, recv_transfer_proof_ack: Sender<()>,
} }
@ -166,43 +131,34 @@ impl EventLoop {
Swarm::listen_on(&mut swarm, listen.clone()) Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Address is not supported: {:#}", listen))?; .with_context(|| format!("Address is not supported: {:#}", listen))?;
let recv_message0 = Channels::new(); let start_execution_setup = Channels::new();
let recv_message1 = Channels::new(); let done_execution_setup = Channels::new();
let recv_message2 = Channels::new();
let recv_encrypted_signature = Channels::new(); let recv_encrypted_signature = Channels::new();
let request = Channels::new(); let request = Channels::new();
let conn_established = Channels::new(); let conn_established = Channels::new();
let send_swap_response = Channels::new(); let send_swap_response = Channels::new();
let send_message0 = Channels::new();
let send_message1 = Channels::new();
let send_transfer_proof = Channels::new(); let send_transfer_proof = Channels::new();
let recv_transfer_proof_ack = Channels::new(); let recv_transfer_proof_ack = Channels::new();
let driver = EventLoop { let driver = EventLoop {
swarm, swarm,
recv_message0: recv_message0.sender, start_execution_setup: start_execution_setup.receiver,
recv_message1: recv_message1.sender, done_execution_setup: done_execution_setup.sender,
recv_message2: recv_message2.sender,
recv_encrypted_signature: recv_encrypted_signature.sender, recv_encrypted_signature: recv_encrypted_signature.sender,
request: request.sender, recv_swap_request: request.sender,
conn_established: conn_established.sender, conn_established: conn_established.sender,
send_swap_response: send_swap_response.receiver, send_swap_response: send_swap_response.receiver,
send_message0: send_message0.receiver,
send_message1: send_message1.receiver,
send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof: send_transfer_proof.receiver,
recv_transfer_proof_ack: recv_transfer_proof_ack.sender, recv_transfer_proof_ack: recv_transfer_proof_ack.sender,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
recv_message0: recv_message0.receiver, start_execution_setup: start_execution_setup.sender,
recv_message1: recv_message1.receiver, done_execution_setup: done_execution_setup.receiver,
recv_message2: recv_message2.receiver,
recv_encrypted_signature: recv_encrypted_signature.receiver, recv_encrypted_signature: recv_encrypted_signature.receiver,
request: request.receiver, recv_swap_request: request.receiver,
conn_established: conn_established.receiver, conn_established: conn_established.receiver,
send_swap_response: send_swap_response.sender, send_swap_response: send_swap_response.sender,
send_message0: send_message0.sender,
send_message1: send_message1.sender,
send_transfer_proof: send_transfer_proof.sender, send_transfer_proof: send_transfer_proof.sender,
recv_transfer_proof_ack: recv_transfer_proof_ack.receiver, recv_transfer_proof_ack: recv_transfer_proof_ack.receiver,
}; };
@ -218,24 +174,26 @@ impl EventLoop {
OutEvent::ConnectionEstablished(alice) => { OutEvent::ConnectionEstablished(alice) => {
let _ = self.conn_established.send(alice).await; let _ = self.conn_established.send(alice).await;
} }
OutEvent::Message0 { msg, channel } => { OutEvent::SwapRequest { msg, channel } => {
let _ = self.recv_message0.send((*msg, channel)).await; let _ = self.recv_swap_request.send((msg, channel)).await;
} }
OutEvent::Message1 { msg, channel } => { OutEvent::ExecutionSetupDone(res) => {
let _ = self.recv_message1.send((msg, channel)).await; let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
}
OutEvent::Message2 { msg, bob_peer_id : _} => {
let _ = self.recv_message2.send(*msg).await;
} }
OutEvent::TransferProofAcknowledged => { OutEvent::TransferProofAcknowledged => {
trace!("Bob acknowledged transfer proof"); trace!("Bob acknowledged transfer proof");
let _ = self.recv_transfer_proof_ack.send(()).await; let _ = self.recv_transfer_proof_ack.send(()).await;
} }
OutEvent::EncryptedSignature(msg) => { OutEvent::EncryptedSignature{ msg, channel } => {
let _ = self.recv_encrypted_signature.send(msg).await; let _ = self.recv_encrypted_signature.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.encrypted_signature.send_ack(channel) {
error!("Failed to send Encrypted Signature ack: {:?}", error);
}
} }
OutEvent::Request(event) => { OutEvent::ResponseSent => {}
let _ = self.request.send(*event).await; OutEvent::Failure(err) => {
error!("Communication error: {:#}", err);
} }
} }
}, },
@ -247,20 +205,11 @@ impl EventLoop {
.map_err(|err|error!("Failed to send swap response: {:#}", err)); .map_err(|err|error!("Failed to send swap response: {:#}", err));
} }
}, },
msg0 = self.send_message0.recv().fuse() => { option = self.start_execution_setup.recv().fuse() => {
if let Some((channel, msg)) = msg0 { if let Some((bob_peer_id, state0)) = option {
let _ = self let _ = self
.swarm .swarm
.send_message0(channel, msg) .start_execution_setup(bob_peer_id, state0);
.map_err(|err|error!("Failed to send message0: {:#}", err));
}
},
msg1 = self.send_message1.recv().fuse() => {
if let Some((channel, msg)) = msg1 {
let _ = self
.swarm
.send_message1(channel, msg)
.map_err(|err|error!("Failed to send message1: {:#}", err));
} }
}, },
transfer_proof = self.send_transfer_proof.recv().fuse() => { transfer_proof = self.send_transfer_proof.recv().fuse() => {

View File

@ -0,0 +1,98 @@
use crate::{
bitcoin,
bitcoin::{EncryptedSignature, Signature},
monero,
network::request_response::BUF_SIZE,
protocol::{
alice::{State0, State3},
bob::{Message0, Message2, Message4},
},
};
use anyhow::{Context, Error, Result};
use libp2p::PeerId;
use libp2p_async_await::BehaviourOutEvent;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message1 {
pub(crate) A: bitcoin::PublicKey,
pub(crate) S_a_monero: monero::PublicKey,
pub(crate) S_a_bitcoin: bitcoin::PublicKey,
pub(crate) dleq_proof_s_a: cross_curve_dleq::Proof,
pub(crate) v_a: monero::PrivateViewKey,
pub(crate) redeem_address: bitcoin::Address,
pub(crate) punish_address: bitcoin::Address,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message3 {
pub(crate) tx_cancel_sig: Signature,
pub(crate) tx_refund_encsig: EncryptedSignature,
}
#[derive(Debug)]
pub enum OutEvent {
Done(Result<State3>),
}
impl From<BehaviourOutEvent<State3, (), anyhow::Error>> for OutEvent {
fn from(event: BehaviourOutEvent<State3, (), Error>) -> Self {
match event {
BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(Ok(State3)),
BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Done(Err(e)),
BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"),
}
}
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
pub struct Behaviour {
inner: libp2p_async_await::Behaviour<State3, (), anyhow::Error>,
}
impl Default for Behaviour {
fn default() -> Self {
Self {
inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"),
}
}
}
impl Behaviour {
pub fn run(&mut self, bob: PeerId, state0: State0) {
self.inner
.do_protocol_listener(bob, move |mut substream| async move {
let message0 =
serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?)
.context("failed to deserialize message0")?;
let state1 = state0.receive(message0)?;
substream
.write_message(
&serde_cbor::to_vec(&state1.next_message())
.context("failed to serialize message1")?,
)
.await?;
let message2 =
serde_cbor::from_slice::<Message2>(&substream.read_message(BUF_SIZE).await?)
.context("failed to deserialize message2")?;
let state2 = state1.receive(message2);
substream
.write_message(
&serde_cbor::to_vec(&state2.next_message())
.context("failed to serialize message3")?,
)
.await?;
let message4 =
serde_cbor::from_slice::<Message4>(&substream.read_message(BUF_SIZE).await?)
.context("failed to deserialize message4")?;
let state3 = state2.receive(message4)?;
Ok(state3)
})
}
}

View File

@ -1,119 +0,0 @@
use crate::{
bitcoin, monero,
network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT},
protocol::bob,
};
use anyhow::{anyhow, Result};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg {
msg: bob::Message0,
channel: ResponseChannel<AliceToBob>,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message0 {
pub(crate) A: bitcoin::PublicKey,
pub(crate) S_a_monero: monero::PublicKey,
pub(crate) S_a_bitcoin: bitcoin::PublicKey,
pub(crate) dleq_proof_s_a: cross_curve_dleq::Proof,
pub(crate) v_a: monero::PrivateViewKey,
pub(crate) redeem_address: bitcoin::Address,
pub(crate) punish_address: bitcoin::Address,
}
/// A `NetworkBehaviour` that represents send/recv of message 0.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message0Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) -> Result<()> {
let msg = AliceToBob::Message0(Box::new(msg));
self.rr
.send_response(channel, msg)
.map_err(|alice_to_bob| anyhow!("Could not send response {:?}", alice_to_bob))
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message0Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message0Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let BobToAlice::Message0(msg) = request {
debug!("Received Message0");
self.events.push_back(OutEvent::Msg { msg: *msg, channel });
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Alice should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent Message0 as response to Bob");
}
}
}
}

View File

@ -1,117 +0,0 @@
use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT},
protocol::bob,
};
use anyhow::{anyhow, Result};
use ecdsa_fun::{adaptor::EncryptedSignature, Signature};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg {
/// Received message from Bob.
msg: bob::Message1,
/// Channel to send back Alice's message 1.
channel: ResponseChannel<AliceToBob>,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message1 {
pub(crate) tx_cancel_sig: Signature,
pub(crate) tx_refund_encsig: EncryptedSignature,
}
/// A `NetworkBehaviour` that represents send/recv of message 1.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message1Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) -> Result<()> {
let msg = AliceToBob::Message1(Box::new(msg));
self.rr
.send_response(channel, msg)
.map_err(|alice_to_bob| anyhow!("Could not send response {:?}", alice_to_bob))
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message1Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message1Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let BobToAlice::Message1(msg) = request {
debug!("Received Message1");
self.events.push_back(OutEvent::Msg { msg: *msg, channel });
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Alice should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message1 response to Bob");
}
}
}
}

View File

@ -1,104 +0,0 @@
use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT},
protocol::bob,
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)]
pub enum OutEvent {
Msg {
msg: bob::Message2,
bob_peer_id: PeerId,
},
}
/// A `NetworkBehaviour` that represents receiving of message 2 from Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message2Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message2Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message2Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request, channel, ..
},
} => {
if let BobToAlice::Message2(msg) = request {
debug!("Received Message 2");
self.events.push_back(OutEvent::Msg {
msg: *msg,
bob_peer_id: peer,
});
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, AliceToBob::Message2);
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Alice should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent a Message2 response to Bob");
}
}
}
}

View File

@ -7,7 +7,11 @@ use crate::{
TxRefund, WatchForRawTransaction, TxRefund, WatchForRawTransaction,
}, },
monero, monero,
protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature, SwapAmounts}, protocol::{
alice::{Message1, Message3, TransferProof},
bob::{EncryptedSignature, Message0, Message2, Message4},
SwapAmounts,
},
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic}; use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic};
@ -16,7 +20,6 @@ use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use std::fmt; use std::fmt;
use tracing::info;
#[derive(Debug)] #[derive(Debug)]
pub enum AliceState { pub enum AliceState {
@ -87,6 +90,7 @@ pub struct State0 {
pub a: bitcoin::SecretKey, pub a: bitcoin::SecretKey,
pub s_a: cross_curve_dleq::Scalar, pub s_a: cross_curve_dleq::Scalar,
pub v_a: monero::PrivateViewKey, pub v_a: monero::PrivateViewKey,
pub dleq_proof_s_a: cross_curve_dleq::Proof,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")] #[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub btc: bitcoin::Amount, pub btc: bitcoin::Amount,
pub xmr: monero::Amount, pub xmr: monero::Amount,
@ -98,7 +102,7 @@ pub struct State0 {
impl State0 { impl State0 {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new<R>(
a: bitcoin::SecretKey, a: bitcoin::SecretKey,
s_a: cross_curve_dleq::Scalar, s_a: cross_curve_dleq::Scalar,
v_a: monero::PrivateViewKey, v_a: monero::PrivateViewKey,
@ -108,11 +112,18 @@ impl State0 {
punish_timelock: Timelock, punish_timelock: Timelock,
redeem_address: bitcoin::Address, redeem_address: bitcoin::Address,
punish_address: bitcoin::Address, punish_address: bitcoin::Address,
) -> Self { rng: &mut R,
) -> Self
where
R: RngCore + CryptoRng,
{
let dleq_proof_s_a = cross_curve_dleq::Proof::new(rng, &s_a);
Self { Self {
a, a,
s_a, s_a,
v_a, v_a,
dleq_proof_s_a,
redeem_address, redeem_address,
punish_address, punish_address,
btc, btc,
@ -122,24 +133,7 @@ impl State0 {
} }
} }
pub fn next_message<R: RngCore + CryptoRng>(&self, rng: &mut R) -> alice::Message0 { pub fn receive(self, msg: Message0) -> Result<State1> {
info!("Producing first message");
let dleq_proof_s_a = cross_curve_dleq::Proof::new(rng, &self.s_a);
alice::Message0 {
A: self.a.public(),
S_a_monero: monero::PublicKey::from_private_key(&monero::PrivateKey {
scalar: self.s_a.into_ed25519(),
}),
S_a_bitcoin: self.s_a.into_secp256k1().into(),
dleq_proof_s_a,
v_a: self.v_a,
redeem_address: self.redeem_address.clone(),
punish_address: self.punish_address.clone(),
}
}
pub fn receive(self, msg: bob::Message0) -> Result<State1> {
msg.dleq_proof_s_b.verify( msg.dleq_proof_s_b.verify(
msg.S_b_bitcoin.clone().into(), msg.S_b_bitcoin.clone().into(),
msg.S_b_monero msg.S_b_monero
@ -157,6 +151,8 @@ impl State0 {
S_b_monero: msg.S_b_monero, S_b_monero: msg.S_b_monero,
S_b_bitcoin: msg.S_b_bitcoin, S_b_bitcoin: msg.S_b_bitcoin,
v, v,
v_a: self.v_a,
dleq_proof_s_a: self.dleq_proof_s_a,
btc: self.btc, btc: self.btc,
xmr: self.xmr, xmr: self.xmr,
cancel_timelock: self.cancel_timelock, cancel_timelock: self.cancel_timelock,
@ -176,6 +172,8 @@ pub struct State1 {
S_b_monero: monero::PublicKey, S_b_monero: monero::PublicKey,
S_b_bitcoin: bitcoin::PublicKey, S_b_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey, v: monero::PrivateViewKey,
v_a: monero::PrivateViewKey,
dleq_proof_s_a: cross_curve_dleq::Proof,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")] #[serde(with = "::bitcoin::util::amount::serde::as_sat")]
btc: bitcoin::Amount, btc: bitcoin::Amount,
xmr: monero::Amount, xmr: monero::Amount,
@ -187,7 +185,21 @@ pub struct State1 {
} }
impl State1 { impl State1 {
pub fn receive(self, msg: bob::Message1) -> State2 { pub fn next_message(&self) -> Message1 {
Message1 {
A: self.a.public(),
S_a_monero: monero::PublicKey::from_private_key(&monero::PrivateKey {
scalar: self.s_a.into_ed25519(),
}),
S_a_bitcoin: self.s_a.into_secp256k1().into(),
dleq_proof_s_a: self.dleq_proof_s_a.clone(),
v_a: self.v_a,
redeem_address: self.redeem_address.clone(),
punish_address: self.punish_address.clone(),
}
}
pub fn receive(self, msg: Message2) -> State2 {
State2 { State2 {
a: self.a, a: self.a,
B: self.B, B: self.B,
@ -227,7 +239,7 @@ pub struct State2 {
} }
impl State2 { impl State2 {
pub fn next_message(&self) -> alice::Message1 { pub fn next_message(&self) -> Message3 {
let tx_cancel = let tx_cancel =
bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B); bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B);
@ -240,13 +252,13 @@ impl State2 {
let tx_refund_encsig = self.a.encsign(self.S_b_bitcoin, tx_refund.digest()); let tx_refund_encsig = self.a.encsign(self.S_b_bitcoin, tx_refund.digest());
let tx_cancel_sig = self.a.sign(tx_cancel.digest()); let tx_cancel_sig = self.a.sign(tx_cancel.digest());
alice::Message1 { Message3 {
tx_refund_encsig, tx_refund_encsig,
tx_cancel_sig, tx_cancel_sig,
} }
} }
pub fn receive(self, msg: bob::Message2) -> Result<State3> { pub fn receive(self, msg: Message4) -> Result<State3> {
let tx_cancel = let tx_cancel =
bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B); bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B);
bitcoin::verify_sig(&self.B, &tx_cancel.digest(), &msg.tx_cancel_sig) bitcoin::verify_sig(&self.B, &tx_cancel.digest(), &msg.tx_cancel_sig)

View File

@ -23,7 +23,6 @@ use futures::{
pin_mut, pin_mut,
}; };
use libp2p::PeerId; use libp2p::PeerId;
use rand::rngs::OsRng;
use sha2::Sha256; use sha2::Sha256;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::timeout; use tokio::time::timeout;
@ -47,48 +46,21 @@ pub async fn negotiate(
let event = timeout( let event = timeout(
execution_params.bob_time_to_act, execution_params.bob_time_to_act,
event_loop_handle.recv_request(), event_loop_handle.recv_swap_request(),
) )
.await .await
.context("Failed to receive swap request from Bob")??; .context("Failed to receive swap request from Bob")??;
event_loop_handle event_loop_handle
.send_swap_response(event.channel, SwapResponse { xmr_amount }) .send_swap_response(event.1, SwapResponse { xmr_amount })
.await?; .await?;
let (bob_message0, channel) = timeout( let state3 = timeout(
execution_params.bob_time_to_act, execution_params.bob_time_to_act,
event_loop_handle.recv_message0(), event_loop_handle.execution_setup(bob_peer_id, state0),
) )
.await??; .await??;
let alice_message0 = state0.next_message(&mut OsRng);
event_loop_handle
.send_message0(channel, alice_message0)
.await?;
let state1 = state0.receive(bob_message0)?;
let (bob_message1, channel) = timeout(
execution_params.bob_time_to_act,
event_loop_handle.recv_message1(),
)
.await??;
let state2 = state1.receive(bob_message1);
event_loop_handle
.send_message1(channel, state2.next_message())
.await?;
let bob_message2 = timeout(
execution_params.bob_time_to_act,
event_loop_handle.recv_message2(),
)
.await??;
let state3 = state2.receive(bob_message2)?;
Ok((bob_peer_id, state3)) Ok((bob_peer_id, state3))
} }

View File

@ -247,9 +247,7 @@ async fn run_until_internal(
.await; .await;
match publishded_redeem_tx { match publishded_redeem_tx {
Ok(_) => { Ok(_) => AliceState::BtcRedeemed,
AliceState::BtcRedeemed
}
Err(e) => { Err(e) => {
bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e) bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e)
} }

View File

@ -1,29 +1,28 @@
use crate::{ use crate::{
monero, monero,
network::request_response::{AliceToBob, BobToAlice, Codec, Swap, TIMEOUT}, network::request_response::{CborCodec, Swap, TIMEOUT},
protocol::bob, protocol::bob::SwapRequest,
}; };
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, ResponseChannel, RequestResponseMessage, ResponseChannel,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, NetworkBehaviour,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)] #[derive(Debug)]
pub struct OutEvent { pub enum OutEvent {
pub msg: bob::SwapRequest, MsgReceived {
pub channel: ResponseChannel<AliceToBob>, msg: SwapRequest,
channel: ResponseChannel<SwapResponse>,
},
ResponseSent,
Failure(Error),
} }
#[derive(Copy, Clone, Debug, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
@ -31,37 +30,58 @@ pub struct SwapResponse {
pub xmr_amount: monero::Amount, pub xmr_amount: monero::Amount,
} }
impl From<RequestResponseEvent<SwapRequest, SwapResponse>> for OutEvent {
fn from(event: RequestResponseEvent<SwapRequest, SwapResponse>) -> Self {
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
debug!("Received swap request from {}", peer);
OutEvent::MsgReceived {
msg: request,
channel,
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!("Alice should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => OutEvent::ResponseSent,
}
}
}
/// A `NetworkBehaviour` that represents negotiate a swap using Swap /// A `NetworkBehaviour` that represents negotiate a swap using Swap
/// request/response. /// request/response.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<Codec<Swap>>, rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
/// Alice always sends her messages as a response to a request from Bob. /// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) -> Result<()> { pub fn send(
let msg = AliceToBob::SwapResponse(Box::new(msg)); &mut self,
channel: ResponseChannel<SwapResponse>,
msg: SwapResponse,
) -> Result<()> {
self.rr self.rr
.send_response(channel, msg) .send_response(channel, msg)
.map_err(|_| anyhow!("Sending swap response failed")) .map_err(|_| anyhow!("Sending swap response failed"))
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Swap>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -73,43 +93,10 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
Codec::default(), CborCodec::default(),
vec![(Swap, ProtocolSupport::Full)], vec![(Swap, ProtocolSupport::Inbound)],
config, config,
), ),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
if let BobToAlice::SwapRequest(msg) = request {
debug!("Received swap request");
self.events.push_back(OutEvent { msg: *msg, channel })
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => panic!("Alice should not get a Response"),
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent a swap response to Bob");
}
} }
} }
} }

View File

@ -1,62 +1,42 @@
use crate::{ use crate::{
monero, monero,
network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT},
}; };
use anyhow::{anyhow, Error};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransferProof { pub struct TransferProof {
pub tx_lock_proof: monero::TransferProof, pub tx_lock_proof: monero::TransferProof,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Acknowledged, Acknowledged,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to /// A `NetworkBehaviour` that represents sending the Monero transfer proof to
/// Bob. /// Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<OneShotCodec<TransferProofProtocol>>, rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, bob: PeerId, msg: TransferProof) { pub fn send(&mut self, bob: PeerId, msg: TransferProof) {
let msg = Request::TransferProof(Box::new(msg));
let _id = self.rr.send_request(&bob, msg); let _id = self.rr.send_request(&bob, msg);
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<OneShotCodec<TransferProofProtocol>>, OutEvent>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -67,37 +47,36 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
OneShotCodec::default(), CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Outbound)], vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour { impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) { fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. }, message: RequestResponseMessage::Request { .. },
.. ..
} => panic!("Alice should never get a transfer proof request from Bob"), } => OutEvent::Failure(anyhow!(
"Alice should never get a transfer proof request from Bob"
)),
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => { } => OutEvent::Acknowledged,
if let Response::TransferProof = response {
self.events.push_back(OutEvent::Acknowledged);
}
}
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => {
OutEvent::Failure(anyhow!("Alice should not send a response"))
} }
RequestResponseEvent::ResponseSent { .. } => {}
} }
} }
} }

View File

@ -3,15 +3,16 @@
use crate::{ use crate::{
bitcoin, database, bitcoin, database,
database::Database, database::Database,
execution_params::ExecutionParams,
monero, network, monero, network,
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
transport::build, transport::build,
}, },
protocol::{alice, bob, SwapAmounts}, protocol::{alice, alice::TransferProof, bob, SwapAmounts},
seed::Seed, seed::Seed,
}; };
use anyhow::{bail, Result}; use anyhow::{bail, Error, Result};
use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId}; use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
@ -21,20 +22,16 @@ use uuid::Uuid;
pub use self::{ pub use self::{
encrypted_signature::EncryptedSignature, encrypted_signature::EncryptedSignature,
event_loop::{EventLoop, EventLoopHandle}, event_loop::{EventLoop, EventLoopHandle},
message0::Message0,
message1::Message1,
message2::Message2,
state::*, state::*,
swap::{run, run_until}, swap::{run, run_until},
swap_request::*, swap_request::*,
}; };
use crate::{execution_params::ExecutionParams, protocol::alice::TransferProof}; pub use execution_setup::{Message0, Message2, Message4};
use libp2p::request_response::ResponseChannel;
mod encrypted_signature; mod encrypted_signature;
pub mod event_loop; pub mod event_loop;
mod message0; mod execution_setup;
mod message1;
mod message2;
pub mod state; pub mod state;
pub mod swap; pub mod swap;
mod swap_request; mod swap_request;
@ -162,6 +159,7 @@ impl Builder {
} }
} }
} }
fn init_event_loop( fn init_event_loop(
&self, &self,
) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> { ) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> {
@ -174,6 +172,7 @@ impl Builder {
self.peer_id, self.peer_id,
self.alice_peer_id, self.alice_peer_id,
self.alice_address.clone(), self.alice_address.clone(),
self.bitcoin_wallet.clone(),
) )
} }
@ -203,15 +202,18 @@ impl Builder {
} }
} }
#[derive(Debug, Clone)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
SwapResponse(alice::SwapResponse), SwapResponse(alice::SwapResponse),
Message0(Box<alice::Message0>), ExecutionSetupDone(Result<Box<State2>>),
Message1(Box<alice::Message1>), TransferProof {
Message2, msg: Box<TransferProof>,
TransferProof(Box<TransferProof>), channel: ResponseChannel<()>,
},
EncryptedSignatureAcknowledged, EncryptedSignatureAcknowledged,
ResponseSent, // Same variant is used for all messages as no processing is done
Failure(Error),
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -226,46 +228,42 @@ impl From<peer_tracker::OutEvent> for OutEvent {
impl From<swap_request::OutEvent> for OutEvent { impl From<swap_request::OutEvent> for OutEvent {
fn from(event: swap_request::OutEvent) -> Self { fn from(event: swap_request::OutEvent) -> Self {
OutEvent::SwapResponse(event.swap_response) use swap_request::OutEvent::*;
}
}
impl From<message0::OutEvent> for OutEvent {
fn from(event: message0::OutEvent) -> Self {
match event { match event {
message0::OutEvent::Msg(msg) => OutEvent::Message0(Box::new(msg)), MsgReceived(swap_response) => OutEvent::SwapResponse(swap_response),
Failure(err) => OutEvent::Failure(err.context("Failre with Swap Request")),
} }
} }
} }
impl From<message1::OutEvent> for OutEvent { impl From<execution_setup::OutEvent> for OutEvent {
fn from(event: message1::OutEvent) -> Self { fn from(event: execution_setup::OutEvent) -> Self {
match event { match event {
message1::OutEvent::Msg(msg) => OutEvent::Message1(Box::new(msg)), execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)),
}
}
}
impl From<message2::OutEvent> for OutEvent {
fn from(event: message2::OutEvent) -> Self {
match event {
message2::OutEvent::Msg => OutEvent::Message2,
} }
} }
} }
impl From<transfer_proof::OutEvent> for OutEvent { impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self { fn from(event: transfer_proof::OutEvent) -> Self {
use transfer_proof::OutEvent::*;
match event { match event {
transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), MsgReceived { msg, channel } => OutEvent::TransferProof {
msg: Box::new(msg),
channel,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
} }
} }
} }
impl From<encrypted_signature::OutEvent> for OutEvent { impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self { fn from(event: encrypted_signature::OutEvent) -> Self {
use encrypted_signature::OutEvent::*;
match event { match event {
encrypted_signature::OutEvent::Acknowledged => OutEvent::EncryptedSignatureAcknowledged, Acknowledged => OutEvent::EncryptedSignatureAcknowledged,
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
} }
} }
} }
@ -277,9 +275,7 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
pub struct Behaviour { pub struct Behaviour {
pt: PeerTracker, pt: PeerTracker,
swap_request: swap_request::Behaviour, swap_request: swap_request::Behaviour,
message0: message0::Behaviour, execution_setup: execution_setup::Behaviour,
message1: message1::Behaviour,
message2: message2::Behaviour,
transfer_proof: transfer_proof::Behaviour, transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour, encrypted_signature: encrypted_signature::Behaviour,
} }
@ -291,22 +287,15 @@ impl Behaviour {
info!("Requesting swap from: {}", alice); info!("Requesting swap from: {}", alice);
} }
/// Sends Bob's first message to Alice. pub fn start_execution_setup(
pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { &mut self,
self.message0.send(alice, msg); alice_peer_id: PeerId,
debug!("Message0 sent"); state0: State0,
} bitcoin_wallet: Arc<bitcoin::Wallet>,
) {
/// Sends Bob's second message to Alice. self.execution_setup
pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) { .run(alice_peer_id, state0, bitcoin_wallet);
self.message1.send(alice, msg); info!("Start execution setup with {}", alice_peer_id);
debug!("Message1 sent");
}
/// Sends Bob's third message to Alice.
pub fn send_message2(&mut self, alice: PeerId, msg: bob::Message2) {
self.message2.send(alice, msg);
debug!("Message2 sent");
} }
/// Sends Bob's fourth message to Alice. /// Sends Bob's fourth message to Alice.

View File

@ -1,61 +1,38 @@
use crate::network::request_response::{ use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT};
EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, use anyhow::{anyhow, Error};
};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptedSignature { pub struct EncryptedSignature {
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Acknowledged, Acknowledged,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. /// A `NetworkBehaviour` that represents sending encrypted signature to Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<OneShotCodec<EncryptedSignatureProtocol>>, rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) {
let msg = Request::EncryptedSignature(Box::new(msg));
let _id = self.rr.send_request(&alice, msg); let _id = self.rr.send_request(&alice, msg);
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<RequestProtocol<OneShotCodec<EncryptedSignatureProtocol>>, OutEvent>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -66,39 +43,34 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
OneShotCodec::default(), CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour { impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) { fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. }, message: RequestResponseMessage::Request { .. },
.. ..
} => panic!("Bob should never get a request from Alice"), } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")),
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => { } => OutEvent::Acknowledged,
if let Response::EncryptedSignature = response {
self.events.push_back(OutEvent::Acknowledged);
}
}
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob does not send the encrypted signature response to Alice");
} }
RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!(
"Bob does not send the encrypted signature response to Alice"
)),
} }
} }
} }

View File

@ -1,15 +1,16 @@
use crate::{ use crate::{
bitcoin,
bitcoin::EncryptedSignature, bitcoin::EncryptedSignature,
network::{transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
protocol::{ protocol::{
alice,
alice::{SwapResponse, TransferProof}, alice::{SwapResponse, TransferProof},
bob::{self, Behaviour, OutEvent, SwapRequest}, bob::{Behaviour, OutEvent, State0, State2, SwapRequest},
}, },
}; };
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use futures::FutureExt; use futures::FutureExt;
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
@ -35,15 +36,12 @@ impl<T> Default for Channels<T> {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_swap_response: Receiver<SwapResponse>, recv_swap_response: Receiver<SwapResponse>,
recv_message0: Receiver<alice::Message0>, start_execution_setup: Sender<State0>,
recv_message1: Receiver<alice::Message1>, done_execution_setup: Receiver<Result<State2>>,
recv_transfer_proof: Receiver<TransferProof>, recv_transfer_proof: Receiver<TransferProof>,
conn_established: Receiver<PeerId>, conn_established: Receiver<PeerId>,
dial_alice: Sender<()>, dial_alice: Sender<()>,
send_swap_request: Sender<SwapRequest>, send_swap_request: Sender<SwapRequest>,
send_message0: Sender<bob::Message0>,
send_message1: Sender<bob::Message1>,
send_message2: Sender<bob::Message2>,
send_encrypted_signature: Sender<EncryptedSignature>, send_encrypted_signature: Sender<EncryptedSignature>,
recv_encrypted_signature_ack: Receiver<()>, recv_encrypted_signature_ack: Receiver<()>,
} }
@ -56,18 +54,13 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to receive swap response from Alice")) .ok_or_else(|| anyhow!("Failed to receive swap response from Alice"))
} }
pub async fn recv_message0(&mut self) -> Result<alice::Message0> { pub async fn execution_setup(&mut self, state0: State0) -> Result<State2> {
self.recv_message0 let _ = self.start_execution_setup.send(state0).await?;
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive message 0 from Alice"))
}
pub async fn recv_message1(&mut self) -> Result<alice::Message1> { self.done_execution_setup
self.recv_message1
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))?
} }
pub async fn recv_transfer_proof(&mut self) -> Result<TransferProof> { pub async fn recv_transfer_proof(&mut self) -> Result<TransferProof> {
@ -96,21 +89,6 @@ impl EventLoopHandle {
Ok(()) Ok(())
} }
pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> {
let _ = self.send_message0.send(msg).await?;
Ok(())
}
pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> {
let _ = self.send_message1.send(msg).await?;
Ok(())
}
pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> {
let _ = self.send_message2.send(msg).await?;
Ok(())
}
pub async fn send_encrypted_signature( pub async fn send_encrypted_signature(
&mut self, &mut self,
tx_redeem_encsig: EncryptedSignature, tx_redeem_encsig: EncryptedSignature,
@ -128,17 +106,15 @@ impl EventLoopHandle {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
recv_swap_response: Sender<SwapResponse>, recv_swap_response: Sender<SwapResponse>,
recv_message0: Sender<alice::Message0>, start_execution_setup: Receiver<State0>,
recv_message1: Sender<alice::Message1>, done_execution_setup: Sender<Result<State2>>,
recv_transfer_proof: Sender<TransferProof>, recv_transfer_proof: Sender<TransferProof>,
dial_alice: Receiver<()>, dial_alice: Receiver<()>,
conn_established: Sender<PeerId>, conn_established: Sender<PeerId>,
send_swap_request: Receiver<SwapRequest>, send_swap_request: Receiver<SwapRequest>,
send_message0: Receiver<bob::Message0>,
send_message1: Receiver<bob::Message1>,
send_message2: Receiver<bob::Message2>,
send_encrypted_signature: Receiver<EncryptedSignature>, send_encrypted_signature: Receiver<EncryptedSignature>,
recv_encrypted_signature_ack: Sender<()>, recv_encrypted_signature_ack: Sender<()>,
} }
@ -150,6 +126,7 @@ impl EventLoop {
peer_id: PeerId, peer_id: PeerId,
alice_peer_id: PeerId, alice_peer_id: PeerId,
alice_addr: Multiaddr, alice_addr: Multiaddr,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<(Self, EventLoopHandle)> { ) -> Result<(Self, EventLoopHandle)> {
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(TokioExecutor { .executor(Box::new(TokioExecutor {
@ -160,46 +137,38 @@ impl EventLoop {
swarm.add_address(alice_peer_id, alice_addr); swarm.add_address(alice_peer_id, alice_addr);
let swap_response = Channels::new(); let swap_response = Channels::new();
let recv_message0 = Channels::new(); let start_execution_setup = Channels::new();
let recv_message1 = Channels::new(); let done_execution_setup = Channels::new();
let recv_transfer_proof = Channels::new(); let recv_transfer_proof = Channels::new();
let dial_alice = Channels::new(); let dial_alice = Channels::new();
let conn_established = Channels::new(); let conn_established = Channels::new();
let send_swap_request = Channels::new(); let send_swap_request = Channels::new();
let send_message0 = Channels::new();
let send_message1 = Channels::new();
let send_message2 = Channels::new();
let send_encrypted_signature = Channels::new(); let send_encrypted_signature = Channels::new();
let recv_encrypted_signature_ack = Channels::new(); let recv_encrypted_signature_ack = Channels::new();
let event_loop = EventLoop { let event_loop = EventLoop {
swarm, swarm,
alice_peer_id, alice_peer_id,
bitcoin_wallet,
recv_swap_response: swap_response.sender, recv_swap_response: swap_response.sender,
recv_message0: recv_message0.sender, start_execution_setup: start_execution_setup.receiver,
recv_message1: recv_message1.sender, done_execution_setup: done_execution_setup.sender,
recv_transfer_proof: recv_transfer_proof.sender, recv_transfer_proof: recv_transfer_proof.sender,
conn_established: conn_established.sender, conn_established: conn_established.sender,
dial_alice: dial_alice.receiver, dial_alice: dial_alice.receiver,
send_swap_request: send_swap_request.receiver, send_swap_request: send_swap_request.receiver,
send_message0: send_message0.receiver,
send_message1: send_message1.receiver,
send_message2: send_message2.receiver,
send_encrypted_signature: send_encrypted_signature.receiver, send_encrypted_signature: send_encrypted_signature.receiver,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.sender, recv_encrypted_signature_ack: recv_encrypted_signature_ack.sender,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
recv_swap_response: swap_response.receiver, recv_swap_response: swap_response.receiver,
recv_message0: recv_message0.receiver, start_execution_setup: start_execution_setup.sender,
recv_message1: recv_message1.receiver, done_execution_setup: done_execution_setup.receiver,
recv_transfer_proof: recv_transfer_proof.receiver, recv_transfer_proof: recv_transfer_proof.receiver,
conn_established: conn_established.receiver, conn_established: conn_established.receiver,
dial_alice: dial_alice.sender, dial_alice: dial_alice.sender,
send_swap_request: send_swap_request.sender, send_swap_request: send_swap_request.sender,
send_message0: send_message0.sender,
send_message1: send_message1.sender,
send_message2: send_message2.sender,
send_encrypted_signature: send_encrypted_signature.sender, send_encrypted_signature: send_encrypted_signature.sender,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.receiver, recv_encrypted_signature_ack: recv_encrypted_signature_ack.receiver,
}; };
@ -218,20 +187,24 @@ impl EventLoop {
OutEvent::SwapResponse(msg) => { OutEvent::SwapResponse(msg) => {
let _ = self.recv_swap_response.send(msg).await; let _ = self.recv_swap_response.send(msg).await;
}, },
OutEvent::Message0(msg) => { OutEvent::ExecutionSetupDone(res) => {
let _ = self.recv_message0.send(*msg).await; let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
} }
OutEvent::Message1(msg) => { OutEvent::TransferProof{ msg, channel }=> {
let _ = self.recv_message1.send(*msg).await;
}
OutEvent::Message2 => info!("Alice acknowledged message 2 received"),
OutEvent::TransferProof(msg) => {
let _ = self.recv_transfer_proof.send(*msg).await; let _ = self.recv_transfer_proof.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.transfer_proof.send_ack(channel) {
error!("Failed to send Transfer Proof ack: {:?}", error);
}
} }
OutEvent::EncryptedSignatureAcknowledged => { OutEvent::EncryptedSignatureAcknowledged => {
debug!("Alice acknowledged encrypted signature"); debug!("Alice acknowledged encrypted signature");
let _ = self.recv_encrypted_signature_ack.send(()).await; let _ = self.recv_encrypted_signature_ack.send(()).await;
} }
OutEvent::ResponseSent => {}
OutEvent::Failure(err) => {
error!("Communication error: {:#}", err)
}
} }
}, },
option = self.dial_alice.recv().fuse() => { option = self.dial_alice.recv().fuse() => {
@ -255,21 +228,11 @@ impl EventLoop {
self.swarm.send_swap_request(self.alice_peer_id, swap_request); self.swarm.send_swap_request(self.alice_peer_id, swap_request);
} }
}, },
option = self.start_execution_setup.recv().fuse() => {
msg0 = self.send_message0.recv().fuse() => { if let Some(state0) = option {
if let Some(msg) = msg0 { let _ = self
self.swarm.send_message0(self.alice_peer_id, msg); .swarm
} .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone());
}
msg1 = self.send_message1.recv().fuse() => {
if let Some(msg) = msg1 {
self.swarm.send_message1(self.alice_peer_id, msg);
}
},
msg2 = self.send_message2.recv().fuse() => {
if let Some(msg) = msg2 {
self.swarm.send_message2(self.alice_peer_id, msg);
} }
}, },
encrypted_signature = self.send_encrypted_signature.recv().fuse() => { encrypted_signature = self.send_encrypted_signature.recv().fuse() => {

View File

@ -0,0 +1,108 @@
use crate::{
bitcoin::Signature,
network::request_response::BUF_SIZE,
protocol::{
alice::{Message1, Message3},
bob::{State0, State2},
},
};
use anyhow::{Context, Error, Result};
use libp2p::PeerId;
use libp2p_async_await::BehaviourOutEvent;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message0 {
pub(crate) B: crate::bitcoin::PublicKey,
pub(crate) S_b_monero: monero::PublicKey,
pub(crate) S_b_bitcoin: crate::bitcoin::PublicKey,
pub(crate) dleq_proof_s_b: cross_curve_dleq::Proof,
pub(crate) v_b: crate::monero::PrivateViewKey,
pub(crate) refund_address: bitcoin::Address,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message2 {
pub(crate) tx_lock: crate::bitcoin::TxLock,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message4 {
pub(crate) tx_punish_sig: Signature,
pub(crate) tx_cancel_sig: Signature,
}
#[derive(Debug)]
pub enum OutEvent {
Done(Result<State2>),
}
impl From<BehaviourOutEvent<(), State2, anyhow::Error>> for OutEvent {
fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self {
match event {
BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)),
BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)),
BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"),
}
}
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
pub struct Behaviour {
inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>,
}
impl Default for Behaviour {
fn default() -> Self {
Self {
inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"),
}
}
}
impl Behaviour {
pub fn run(
&mut self,
alice: PeerId,
state0: State0,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) {
self.inner
.do_protocol_dialer(alice, move |mut substream| async move {
substream
.write_message(
&serde_cbor::to_vec(&state0.next_message())
.context("failed to serialize message0")?,
)
.await?;
let message1 =
serde_cbor::from_slice::<Message1>(&substream.read_message(BUF_SIZE).await?)
.context("failed to deserialize message1")?;
let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?;
substream
.write_message(
&serde_cbor::to_vec(&state1.next_message())
.context("failed to serialize message2")?,
)
.await?;
let message3 =
serde_cbor::from_slice::<Message3>(&substream.read_message(BUF_SIZE).await?)
.context("failed to deserialize message3")?;
let state2 = state1.receive(message3)?;
substream
.write_message(
&serde_cbor::to_vec(&state2.next_message())
.context("failed to serialize message4")?,
)
.await?;
Ok(state2)
})
}
}

View File

@ -1,110 +0,0 @@
use crate::{
bitcoin, monero,
network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT},
protocol::{alice, bob},
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message0 {
pub(crate) B: bitcoin::PublicKey,
pub(crate) S_b_monero: monero::PublicKey,
pub(crate) S_b_bitcoin: bitcoin::PublicKey,
pub(crate) dleq_proof_s_b: cross_curve_dleq::Proof,
pub(crate) v_b: monero::PrivateViewKey,
pub(crate) refund_address: bitcoin::Address,
}
#[derive(Debug)]
pub enum OutEvent {
Msg(alice::Message0),
}
/// A `NetworkBehaviour` that represents send/recv of message 0.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message0Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: bob::Message0) {
let msg = BobToAlice::Message0(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message0Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message0Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let AliceToBob::Message0(msg) = response {
debug!("Received Message0");
self.events.push_back(OutEvent::Msg(*msg));
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob does not send a message0 response to Alice");
}
}
}
}

View File

@ -1,105 +0,0 @@
use crate::{
bitcoin,
network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT},
protocol::alice,
};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message1 {
pub(crate) tx_lock: bitcoin::TxLock,
}
#[derive(Debug)]
pub enum OutEvent {
Msg(alice::Message1),
}
/// A `NetworkBehaviour` that represents send/recv of message 1.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message1Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: Message1) {
let msg = BobToAlice::Message1(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message1Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message1Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let AliceToBob::Message1(msg) = response {
debug!("Received Message1");
self.events.push_back(OutEvent::Msg(*msg));
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob does not send a message 1 response to Alice");
}
}
}
}

View File

@ -1,103 +0,0 @@
use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT};
use ecdsa_fun::Signature;
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message2 {
pub(crate) tx_punish_sig: Signature,
pub(crate) tx_cancel_sig: Signature,
}
#[derive(Clone, Copy, Debug)]
pub enum OutEvent {
Msg,
}
/// A `NetworkBehaviour` that represents sending message 2 to Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<Codec<Message2Protocol>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: Message2) {
let msg = BobToAlice::Message2(Box::new(msg));
let _id = self.rr.send_request(&alice, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Message2Protocol>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Message2Protocol, ProtocolSupport::Full)],
config,
),
events: VecDeque::default(),
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => {
if let AliceToBob::Message2 = response {
debug!("Received Message 2 acknowledgement");
self.events.push_back(OutEvent::Msg);
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob does not send a Message2 response to Alice");
}
}
}
}

View File

@ -9,7 +9,11 @@ use crate::{
execution_params::ExecutionParams, execution_params::ExecutionParams,
monero, monero,
monero::{monero_private_key, TransferProof}, monero::{monero_private_key, TransferProof},
protocol::{alice, bob, bob::EncryptedSignature, SwapAmounts}, protocol::{
alice::{Message1, Message3},
bob::{EncryptedSignature, Message0, Message2, Message4},
SwapAmounts,
},
}; };
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature}; use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature};
@ -74,6 +78,7 @@ pub struct State0 {
b: bitcoin::SecretKey, b: bitcoin::SecretKey,
s_b: cross_curve_dleq::Scalar, s_b: cross_curve_dleq::Scalar,
v_b: monero::PrivateViewKey, v_b: monero::PrivateViewKey,
dleq_proof_s_b: cross_curve_dleq::Proof,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")] #[serde(with = "::bitcoin::util::amount::serde::as_sat")]
btc: bitcoin::Amount, btc: bitcoin::Amount,
xmr: monero::Amount, xmr: monero::Amount,
@ -97,6 +102,7 @@ impl State0 {
let s_b = cross_curve_dleq::Scalar::random(rng); let s_b = cross_curve_dleq::Scalar::random(rng);
let v_b = monero::PrivateViewKey::new_random(rng); let v_b = monero::PrivateViewKey::new_random(rng);
let dleq_proof_s_b = cross_curve_dleq::Proof::new(rng, &s_b);
Self { Self {
b, b,
@ -104,6 +110,7 @@ impl State0 {
v_b, v_b,
btc, btc,
xmr, xmr,
dleq_proof_s_b,
cancel_timelock, cancel_timelock,
punish_timelock, punish_timelock,
refund_address, refund_address,
@ -111,22 +118,20 @@ impl State0 {
} }
} }
pub fn next_message<R: RngCore + CryptoRng>(&self, rng: &mut R) -> bob::Message0 { pub fn next_message(&self) -> Message0 {
let dleq_proof_s_b = cross_curve_dleq::Proof::new(rng, &self.s_b); Message0 {
bob::Message0 {
B: self.b.public(), B: self.b.public(),
S_b_monero: monero::PublicKey::from_private_key(&monero::PrivateKey { S_b_monero: monero::PublicKey::from_private_key(&monero::PrivateKey {
scalar: self.s_b.into_ed25519(), scalar: self.s_b.into_ed25519(),
}), }),
S_b_bitcoin: self.s_b.into_secp256k1().into(), S_b_bitcoin: self.s_b.into_secp256k1().into(),
dleq_proof_s_b, dleq_proof_s_b: self.dleq_proof_s_b.clone(),
v_b: self.v_b, v_b: self.v_b,
refund_address: self.refund_address.clone(), refund_address: self.refund_address.clone(),
} }
} }
pub async fn receive<W>(self, wallet: &W, msg: alice::Message0) -> anyhow::Result<State1> pub async fn receive<W>(self, wallet: &W, msg: Message1) -> anyhow::Result<State1>
where where
W: BuildTxLockPsbt + GetNetwork, W: BuildTxLockPsbt + GetNetwork,
{ {
@ -182,13 +187,13 @@ pub struct State1 {
} }
impl State1 { impl State1 {
pub fn next_message(&self) -> bob::Message1 { pub fn next_message(&self) -> Message2 {
bob::Message1 { Message2 {
tx_lock: self.tx_lock.clone(), tx_lock: self.tx_lock.clone(),
} }
} }
pub fn receive(self, msg: alice::Message1) -> Result<State2> { pub fn receive(self, msg: Message3) -> Result<State2> {
let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public()); let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public());
let tx_refund = bitcoin::TxRefund::new(&tx_cancel, &self.refund_address); let tx_refund = bitcoin::TxRefund::new(&tx_cancel, &self.refund_address);
@ -245,14 +250,14 @@ pub struct State2 {
} }
impl State2 { impl State2 {
pub fn next_message(&self) -> bob::Message2 { pub fn next_message(&self) -> Message4 {
let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public()); let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public());
let tx_cancel_sig = self.b.sign(tx_cancel.digest()); let tx_cancel_sig = self.b.sign(tx_cancel.digest());
let tx_punish = let tx_punish =
bitcoin::TxPunish::new(&tx_cancel, &self.punish_address, self.punish_timelock); bitcoin::TxPunish::new(&tx_cancel, &self.punish_address, self.punish_timelock);
let tx_punish_sig = self.b.sign(tx_punish.digest()); let tx_punish_sig = self.b.sign(tx_punish.digest());
bob::Message2 { Message4 {
tx_punish_sig, tx_punish_sig,
tx_cancel_sig, tx_cancel_sig,
} }

View File

@ -11,7 +11,6 @@ use crate::{
}; };
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use rand::{rngs::OsRng, CryptoRng, RngCore};
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tracing::info; use tracing::info;
@ -44,7 +43,6 @@ pub async fn run_until(
swap.db, swap.db,
swap.bitcoin_wallet, swap.bitcoin_wallet,
swap.monero_wallet, swap.monero_wallet,
OsRng,
swap.swap_id, swap.swap_id,
swap.execution_params, swap.execution_params,
) )
@ -54,20 +52,16 @@ pub async fn run_until(
// State machine driver for swap execution // State machine driver for swap execution
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[async_recursion] #[async_recursion]
async fn run_until_internal<R>( async fn run_until_internal(
state: BobState, state: BobState,
is_target_state: fn(&BobState) -> bool, is_target_state: fn(&BobState) -> bool,
mut event_loop_handle: EventLoopHandle, mut event_loop_handle: EventLoopHandle,
db: Database, db: Database,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
mut rng: R,
swap_id: Uuid, swap_id: Uuid,
execution_params: ExecutionParams, execution_params: ExecutionParams,
) -> Result<BobState> ) -> Result<BobState> {
where
R: RngCore + CryptoRng + Send,
{
info!("Current state: {}", state); info!("Current state: {}", state);
if is_target_state(&state) { if is_target_state(&state) {
Ok(state) Ok(state)
@ -76,14 +70,7 @@ where
BobState::Started { state0, amounts } => { BobState::Started { state0, amounts } => {
event_loop_handle.dial().await?; event_loop_handle.dial().await?;
let state2 = negotiate( let state2 = negotiate(state0, amounts, &mut event_loop_handle).await?;
state0,
amounts,
&mut event_loop_handle,
&mut rng,
bitcoin_wallet.clone(),
)
.await?;
let state = BobState::Negotiated(state2); let state = BobState::Negotiated(state2);
let db_state = state.clone().into(); let db_state = state.clone().into();
@ -95,7 +82,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -117,7 +103,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -170,7 +155,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -217,7 +201,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -260,7 +243,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -296,7 +278,6 @@ where
db, db,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -318,7 +299,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -344,7 +324,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -376,7 +355,6 @@ where
db, db,
bitcoin_wallet, bitcoin_wallet,
monero_wallet, monero_wallet,
rng,
swap_id, swap_id,
execution_params, execution_params,
) )
@ -390,18 +368,13 @@ where
} }
} }
pub async fn negotiate<R>( pub async fn negotiate(
state0: crate::protocol::bob::state::State0, state0: crate::protocol::bob::state::State0,
amounts: SwapAmounts, amounts: SwapAmounts,
swarm: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
mut rng: R, ) -> Result<bob::state::State2> {
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) -> Result<bob::state::State2>
where
R: RngCore + CryptoRng + Send,
{
tracing::trace!("Starting negotiate"); tracing::trace!("Starting negotiate");
swarm event_loop_handle
.send_swap_request(SwapRequest { .send_swap_request(SwapRequest {
btc_amount: amounts.btc, btc_amount: amounts.btc,
}) })
@ -409,17 +382,9 @@ where
// TODO: Use this once Bob's CLI is modified to only pass xmr amount in // TODO: Use this once Bob's CLI is modified to only pass xmr amount in
// argument. // argument.
let _swap_response = swarm.recv_swap_response().await?; let _swap_response = event_loop_handle.recv_swap_response().await?;
swarm.send_message0(state0.next_message(&mut rng)).await?; let state2 = event_loop_handle.execution_setup(state0).await?;
let msg0 = swarm.recv_message0().await?;
let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?;
swarm.send_message1(state1.next_message()).await?;
let msg1 = swarm.recv_message1().await?;
let state2 = state1.receive(msg1)?;
swarm.send_message2(state2.next_message()).await?;
Ok(state2) Ok(state2)
} }

View File

@ -1,23 +1,18 @@
use crate::{ use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Swap, TIMEOUT}, network::request_response::{CborCodec, Swap, TIMEOUT},
protocol::alice::SwapResponse, protocol::alice::SwapResponse,
}; };
use anyhow::Result; use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, RequestResponseMessage,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct SwapRequest { pub struct SwapRequest {
@ -25,40 +20,26 @@ pub struct SwapRequest {
pub btc_amount: bitcoin::Amount, pub btc_amount: bitcoin::Amount,
} }
#[derive(Copy, Clone, Debug)] #[derive(Debug)]
pub struct OutEvent { pub enum OutEvent {
pub swap_response: SwapResponse, MsgReceived(SwapResponse),
Failure(Error),
} }
/// A `NetworkBehaviour` that represents doing the negotiation of a swap. /// A `NetworkBehaviour` that represents doing the negotiation of a swap.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<Codec<Swap>>, rr: RequestResponse<CborCodec<Swap, SwapRequest, SwapResponse>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result<RequestId> { pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result<RequestId> {
let msg = BobToAlice::SwapRequest(Box::new(swap_request)); let id = self.rr.send_request(&alice, swap_request);
let id = self.rr.send_request(&alice, msg);
Ok(id) Ok(id)
} }
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec<Swap>>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -70,41 +51,37 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
Codec::default(), CborCodec::default(),
vec![(Swap, ProtocolSupport::Outbound)], vec![(Swap, ProtocolSupport::Outbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Behaviour { impl From<RequestResponseEvent<SwapRequest, SwapResponse>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) { fn from(event: RequestResponseEvent<SwapRequest, SwapResponse>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. }, message: RequestResponseMessage::Request { .. },
.. ..
} => panic!("Bob should never get a request from Alice"), } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")),
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Response { response, .. }, message: RequestResponseMessage::Response { response, .. },
.. ..
} => { } => {
if let AliceToBob::SwapResponse(swap_response) = response { debug!("Received swap response from {}", peer);
debug!("Received swap response"); OutEvent::MsgReceived(response)
self.events.push_back(OutEvent {
swap_response: *swap_response,
});
}
} }
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
} }
RequestResponseEvent::ResponseSent { .. } => { RequestResponseEvent::ResponseSent { .. } => {
error!("Bob does not send a swap response to Alice"); OutEvent::Failure(anyhow!("Bob does not send a swap response to Alice"))
} }
} }
} }

View File

@ -1,50 +1,42 @@
use crate::{ use crate::{
network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT},
protocol::alice::TransferProof, protocol::alice::TransferProof,
}; };
use anyhow::{anyhow, Error, Result};
use libp2p::{ use libp2p::{
request_response::{ request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseEvent, RequestResponseMessage, RequestResponseMessage, ResponseChannel,
}, },
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, NetworkBehaviour,
}; };
use std::{ use std::time::Duration;
collections::VecDeque, use tracing::debug;
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Msg(TransferProof), MsgReceived {
msg: TransferProof,
channel: ResponseChannel<()>,
},
AckSent,
Failure(Error),
} }
/// A `NetworkBehaviour` that represents receiving the transfer proof from /// A `NetworkBehaviour` that represents receiving the transfer proof from
/// Alice. /// Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
rr: RequestResponse<OneShotCodec<TransferProofProtocol>>, rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
} }
impl Behaviour { impl Behaviour {
fn poll( pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
&mut self, self.rr
_: &mut Context<'_>, .send_response(channel, ())
_: &mut impl PollParameters, .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err))
) -> Poll<NetworkBehaviourAction<RequestProtocol<OneShotCodec<TransferProofProtocol>>, OutEvent>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
} }
} }
@ -56,46 +48,42 @@ impl Default for Behaviour {
Self { Self {
rr: RequestResponse::new( rr: RequestResponse::new(
OneShotCodec::default(), CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Inbound)], vec![(TransferProofProtocol, ProtocolSupport::Inbound)],
config, config,
), ),
events: Default::default(),
} }
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for Behaviour { impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
fn inject_event(&mut self, event: RequestResponseEvent<Request, Response>) { fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
peer,
message: message:
RequestResponseMessage::Request { RequestResponseMessage::Request {
request, channel, .. request, channel, ..
}, },
.. ..
} => { } => {
if let Request::TransferProof(msg) = request { debug!("Received Transfer Proof from {}", peer);
debug!("Received Transfer Proof"); OutEvent::MsgReceived {
self.events.push_back(OutEvent::Msg(*msg)); msg: request,
// Send back empty response so that the request/response protocol completes. channel,
let _ = self
.rr
.send_response(channel, Response::TransferProof)
.map_err(|err| error!("Failed to send message 3: {:?}", err));
} }
} }
RequestResponseEvent::Message { RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. }, message: RequestResponseMessage::Response { .. },
.. ..
} => panic!("Bob should not get a Response"), } => OutEvent::Failure(anyhow!("Bob should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => { RequestResponseEvent::InboundFailure { error, .. } => {
error!("Inbound failure: {:?}", error); OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
} }
RequestResponseEvent::OutboundFailure { error, .. } => { RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error); OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
} }
RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"), RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
} }
} }
} }

View File

@ -15,10 +15,10 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
let alice_state = alice::run_until(alice_swap, is_encsig_learned) let alice_state = alice::run_until(alice_swap, is_encsig_learned)
.await .await
.unwrap(); .unwrap();
assert!(matches!(alice_state, AliceState::EncSigLearned {..})); assert!(matches!(alice_state, AliceState::EncSigLearned { .. }));
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!(matches!(alice_swap.state, AliceState::EncSigLearned {..})); assert!(matches!(alice_swap.state, AliceState::EncSigLearned { .. }));
let alice_state = alice::run(alice_swap).await.unwrap(); let alice_state = alice::run(alice_swap).await.unwrap();

View File

@ -14,10 +14,10 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
let bob_state = bob::run_until(bob_swap, is_encsig_sent).await.unwrap(); let bob_state = bob::run_until(bob_swap, is_encsig_sent).await.unwrap();
assert!(matches!(bob_state, BobState::EncSigSent {..})); assert!(matches!(bob_state, BobState::EncSigSent { .. }));
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::EncSigSent {..})); assert!(matches!(bob_swap.state, BobState::EncSigSent { .. }));
let bob_state = bob::run(bob_swap).await.unwrap(); let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -16,11 +16,13 @@ async fn given_bob_restarts_after_lock_proof_received_resume_swap() {
.await .await
.unwrap(); .unwrap();
assert!(matches!(bob_state, BobState::XmrLockProofReceived {..})); assert!(matches!(bob_state, BobState::XmrLockProofReceived { .. }));
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLockProofReceived assert!(matches!(
{..})); bob_swap.state,
BobState::XmrLockProofReceived { .. }
));
let bob_state = bob::run(bob_swap).await.unwrap(); let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -14,10 +14,10 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap(); let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap();
assert!(matches!(bob_state, BobState::XmrLocked {..})); assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLocked {..})); assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await.unwrap(); let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -16,7 +16,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap();
assert!(matches!(bob_state, BobState::BtcLocked {..})); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let alice_state = alice_handle.await.unwrap(); let alice_state = alice_handle.await.unwrap();
ctx.assert_alice_punished(alice_state.unwrap()).await; ctx.assert_alice_punished(alice_state.unwrap()).await;
@ -24,7 +24,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
// Restart Bob after Alice punished to ensure Bob transitions to // Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely // punished and does not run indefinitely
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::BtcLocked {..})); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await.unwrap(); let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -15,8 +15,7 @@ async fn given_alice_restarts_after_xmr_is_locked_refund_swap() {
let bob_handle = tokio::spawn(bob); let bob_handle = tokio::spawn(bob);
let alice_state = alice::run_until(alice_swap, is_xmr_locked).await.unwrap(); let alice_state = alice::run_until(alice_swap, is_xmr_locked).await.unwrap();
assert!(matches!(alice_state, assert!(matches!(alice_state, AliceState::XmrLocked { .. }));
AliceState::XmrLocked {..}));
// Alice does not act, Bob refunds // Alice does not act, Bob refunds
let bob_state = bob_handle.await.unwrap(); let bob_state = bob_handle.await.unwrap();
@ -24,7 +23,7 @@ async fn given_alice_restarts_after_xmr_is_locked_refund_swap() {
// Once bob has finished Alice is restarted and refunds as well // Once bob has finished Alice is restarted and refunds as well
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!(matches!(alice_swap.state, AliceState::XmrLocked {..})); assert!(matches!(alice_swap.state, AliceState::XmrLocked { .. }));
let alice_state = alice::run(alice_swap).await.unwrap(); let alice_state = alice::run(alice_swap).await.unwrap();

View File

@ -22,7 +22,7 @@ async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_re
.await .await
.unwrap(); .unwrap();
assert!( assert!(
matches!(alice_state, AliceState::EncSigLearned {..}), matches!(alice_state, AliceState::EncSigLearned { .. }),
"Alice state is not EncSigLearned: {:?}", "Alice state is not EncSigLearned: {:?}",
alice_state alice_state
); );
@ -34,8 +34,7 @@ async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_re
// Once bob has finished Alice is restarted and refunds as well // Once bob has finished Alice is restarted and refunds as well
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!( assert!(
matches!(alice_swap.state, AliceState::EncSigLearned matches!(alice_swap.state, AliceState::EncSigLearned { .. }),
{..}),
"Alice state is not EncSigLearned: {:?}", "Alice state is not EncSigLearned: {:?}",
alice_state alice_state
); );

View File

@ -478,17 +478,11 @@ pub mod alice_run_until {
use swap::protocol::alice::AliceState; use swap::protocol::alice::AliceState;
pub fn is_xmr_locked(state: &AliceState) -> bool { pub fn is_xmr_locked(state: &AliceState) -> bool {
matches!( matches!(state, AliceState::XmrLocked { .. })
state,
AliceState::XmrLocked{..}
)
} }
pub fn is_encsig_learned(state: &AliceState) -> bool { pub fn is_encsig_learned(state: &AliceState) -> bool {
matches!( matches!(state, AliceState::EncSigLearned { .. })
state,
AliceState::EncSigLearned{..}
)
} }
} }