340: Remove connection handling from swap execution and cleanup network protocol implementations r=thomaseizinger a=thomaseizinger

TODO:

- [x] Re-establish connection on Bob's side if it is dropped.
- [x] Make sure Alice buffers messages if there isn't currently a connection
- [x] Do a testnet swap to make sure it works

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
bors[bot] 2021-03-24 04:30:45 +00:00 committed by GitHub
commit c0398f7245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 653 additions and 1009 deletions

View File

@ -114,7 +114,8 @@ jobs:
matrix:
test_name: [
happy_path,
happy_path_restart_bob_before_comm,
happy_path_restart_bob_after_xmr_locked,
happy_path_restart_bob_before_xmr_locked,
bob_refunds_using_cancel_and_refund_command,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force,

View File

@ -7,7 +7,8 @@ status = [
"test (x86_64-unknown-linux-gnu, ubuntu-latest)",
"test (x86_64-apple-darwin, macos-latest)",
"docker_tests (happy_path)",
"docker_tests (happy_path_restart_bob_before_comm)",
"docker_tests (happy_path_restart_bob_after_xmr_locked)",
"docker_tests (happy_path_restart_bob_before_xmr_locked)",
"docker_tests (bob_refunds_using_cancel_and_refund_command)",
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)",
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)",

View File

@ -15,6 +15,7 @@
use anyhow::{Context, Result};
use bdk::descriptor::Segwitv0;
use bdk::keys::DerivableKey;
use libp2p::Swarm;
use prettytable::{row, Table};
use std::path::Path;
use std::sync::Arc;
@ -27,7 +28,8 @@ use swap::database::Database;
use swap::env::GetConfig;
use swap::fs::default_config_path;
use swap::monero::Amount;
use swap::protocol::alice::{run, EventLoop};
use swap::network::swarm;
use swap::protocol::alice::{run, Behaviour, EventLoop};
use swap::seed::Seed;
use swap::trace::init_tracing;
use swap::{bitcoin, env, kraken, monero};
@ -93,9 +95,12 @@ async fn main() -> Result<()> {
let kraken_rate_updates = kraken::connect()?;
let mut swarm = swarm::new::<Behaviour>(&seed)?;
Swarm::listen_on(&mut swarm, config.network.listen)
.context("Failed to listen network interface")?;
let (event_loop, mut swap_receiver) = EventLoop::new(
config.network.listen,
seed,
swarm,
env_config,
Arc::new(bitcoin_wallet),
Arc::new(monero_wallet),

View File

@ -25,8 +25,9 @@ use swap::cli::command::{AliceConnectParams, Arguments, Command, Data, MoneroPar
use swap::database::Database;
use swap::env::{Config, GetConfig};
use swap::network::quote::BidQuote;
use swap::network::swarm;
use swap::protocol::bob;
use swap::protocol::bob::{Builder, EventLoop};
use swap::protocol::bob::{Behaviour, Builder, EventLoop};
use swap::seed::Seed;
use swap::{bitcoin, env, monero};
use tracing::{debug, error, info, warn, Level};
@ -101,17 +102,17 @@ async fn main() -> Result<()> {
}
let bitcoin_wallet =
init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?;
init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?;
let (monero_wallet, _process) =
init_monero_wallet(data_dir, monero_daemon_host, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, mut event_loop_handle) = EventLoop::new(
&seed.derive_libp2p_identity(),
alice_peer_id,
alice_addr,
bitcoin_wallet.clone(),
)?;
let handle = tokio::spawn(event_loop.run());
let mut swarm = swarm::new::<Behaviour>(&seed)?;
swarm.add_address(alice_peer_id, alice_addr);
let (event_loop, mut event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?;
let event_loop = tokio::spawn(event_loop.run());
let send_bitcoin = determine_btc_to_swap(
event_loop_handle.request_quote(),
@ -142,13 +143,13 @@ async fn main() -> Result<()> {
.with_init_params(send_bitcoin)
.build()?;
let swap = bob::run(swap);
tokio::select! {
event_loop_result = handle => {
event_loop_result??;
result = event_loop => {
result
.context("EventLoop panicked")?;
},
swap_result = swap => {
swap_result?;
result = bob::run(swap) => {
result.context("Failed to complete swap")?;
}
}
}
@ -183,17 +184,16 @@ async fn main() -> Result<()> {
}
let bitcoin_wallet =
init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?;
init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?;
let (monero_wallet, _process) =
init_monero_wallet(data_dir, monero_daemon_host, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let (event_loop, event_loop_handle) = EventLoop::new(
&seed.derive_libp2p_identity(),
alice_peer_id,
alice_addr,
bitcoin_wallet.clone(),
)?;
let mut swarm = swarm::new::<Behaviour>(&seed)?;
swarm.add_address(alice_peer_id, alice_addr);
let (event_loop, event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?;
let handle = tokio::spawn(event_loop.run());
let swap = Builder::new(
@ -207,12 +207,11 @@ async fn main() -> Result<()> {
)
.build()?;
let swap = bob::run(swap);
tokio::select! {
event_loop_result = handle => {
event_loop_result??;
event_loop_result?;
},
swap_result = swap => {
swap_result = bob::run(swap) => {
swap_result?;
}
}
@ -223,7 +222,7 @@ async fn main() -> Result<()> {
electrum_rpc_url,
} => {
let bitcoin_wallet =
init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?;
init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?;
let resume_state = db.get_state(swap_id)?.try_into_bob()?.into();
let cancel =
@ -248,7 +247,7 @@ async fn main() -> Result<()> {
electrum_rpc_url,
} => {
let bitcoin_wallet =
init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?;
init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?;
let resume_state = db.get_state(swap_id)?.try_into_bob()?.into();
@ -260,7 +259,7 @@ async fn main() -> Result<()> {
async fn init_bitcoin_wallet(
electrum_rpc_url: Url,
seed: Seed,
seed: &Seed,
data_dir: PathBuf,
env_config: Config,
) -> Result<bitcoin::Wallet> {
@ -314,7 +313,7 @@ async fn determine_btc_to_swap(
) -> Result<bitcoin::Amount> {
debug!("Requesting quote");
let bid_quote = request_quote.await.context("Failed to request quote")?;
let bid_quote = request_quote.await?;
info!("Received quote: 1 XMR ~ {}", bid_quote.price);

View File

@ -1,21 +1,7 @@
pub mod peer_tracker;
pub mod cbor_request_response;
pub mod encrypted_signature;
pub mod quote;
pub mod request_response;
pub mod spot_price;
pub mod swarm;
pub mod transfer_proof;
pub mod transport;
use libp2p::core::Executor;
use std::future::Future;
use std::pin::Pin;
use tokio::runtime::Handle;
#[allow(missing_debug_implementations)]
pub struct TokioExecutor {
pub handle: Handle,
}
impl Executor for TokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.handle.spawn(future);
}
}

View File

@ -9,30 +9,9 @@ use std::fmt::Debug;
use std::io;
use std::marker::PhantomData;
/// Time to wait for a response back once we send a request.
pub const TIMEOUT: u64 = 3600; // One hour.
/// Message receive buffer.
pub const BUF_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone, Copy, Default)]
pub struct TransferProofProtocol;
#[derive(Debug, Clone, Copy, Default)]
pub struct EncryptedSignatureProtocol;
impl ProtocolName for TransferProofProtocol {
fn protocol_name(&self) -> &[u8] {
b"/comit/xmr/btc/transfer_proof/1.0.0"
}
}
impl ProtocolName for EncryptedSignatureProtocol {
fn protocol_name(&self) -> &[u8] {
b"/comit/xmr/btc/encrypted_signature/1.0.0"
}
}
#[derive(Clone, Copy, Debug)]
pub struct CborCodec<P, Req, Res> {
phantom: PhantomData<(P, Req, Res)>,

View File

@ -0,0 +1,43 @@
use crate::network::cbor_request_response::CborCodec;
use libp2p::core::ProtocolName;
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use serde::{Deserialize, Serialize};
pub type OutEvent = RequestResponseEvent<Request, ()>;
#[derive(Debug, Clone, Copy, Default)]
pub struct EncryptedSignatureProtocol;
impl ProtocolName for EncryptedSignatureProtocol {
fn protocol_name(&self) -> &[u8] {
b"/comit/xmr/btc/encrypted_signature/1.0.0"
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request {
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
}
pub type Behaviour = RequestResponse<CborCodec<EncryptedSignatureProtocol, Request, ()>>;
pub type Message = RequestResponseMessage<Request, ()>;
pub fn alice() -> Behaviour {
Behaviour::new(
CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
RequestResponseConfig::default(),
)
}
pub fn bob() -> Behaviour {
Behaviour::new(
CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)],
RequestResponseConfig::default(),
)
}

View File

@ -1,126 +0,0 @@
use futures::task::Context;
use libp2p::core::connection::ConnectionId;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::protocols_handler::DummyProtocolsHandler;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::{Multiaddr, PeerId};
use std::collections::{HashMap, VecDeque};
use std::task::Poll;
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
ConnectionEstablished(PeerId),
}
/// A NetworkBehaviour that tracks connections to the counterparty. Although the
/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple
/// peers we only ever connect to a single counterparty. Peer Tracker tracks
/// that connection.
#[derive(Default, Debug)]
pub struct Behaviour {
connected: Option<(PeerId, Multiaddr)>,
address_of_peer: HashMap<PeerId, Multiaddr>,
events: VecDeque<OutEvent>,
}
impl Behaviour {
/// Return whether we are connected to the given peer.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
if let Some((connected_peer_id, _)) = &self.connected {
if connected_peer_id == peer_id {
return true;
}
}
false
}
/// Returns the peer id of counterparty if we are connected.
pub fn counterparty_peer_id(&self) -> Option<PeerId> {
if let Some((id, _)) = &self.connected {
return Some(*id);
}
None
}
/// Returns the peer_id and multiaddr of counterparty if we are connected.
pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> {
if let Some((peer_id, addr)) = &self.connected {
return Some((*peer_id, addr.clone()));
}
None
}
/// Add an address for a given peer. We only store one address per peer.
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.address_of_peer.insert(peer_id, address);
}
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut addresses: Vec<Multiaddr> = vec![];
if let Some((counterparty_peer_id, addr)) = self.counterparty() {
if counterparty_peer_id == *peer_id {
addresses.push(addr)
}
}
if let Some(addr) = self.address_of_peer.get(peer_id) {
addresses.push(addr.clone());
}
addresses
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_established(
&mut self,
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
match point {
ConnectedPoint::Dialer { address } => {
self.connected = Some((*peer, address.clone()));
}
ConnectedPoint::Listener {
local_addr: _,
send_back_addr,
} => {
self.connected = Some((*peer, send_back_addr.clone()));
}
}
self.events
.push_back(OutEvent::ConnectionEstablished(*peer));
}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
self.connected = None;
}
fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<void::Void, Self::OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}

View File

@ -1,8 +1,9 @@
use crate::bitcoin;
use crate::network::request_response::CborCodec;
use crate::network::cbor_request_response::CborCodec;
use libp2p::core::ProtocolName;
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use serde::{Deserialize, Serialize};
@ -17,6 +18,8 @@ impl ProtocolName for BidQuoteProtocol {
}
}
pub type Message = RequestResponseMessage<(), BidQuote>;
/// Represents a quote for buying XMR.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BidQuote {

View File

@ -1,8 +1,9 @@
use crate::network::request_response::CborCodec;
use crate::network::cbor_request_response::CborCodec;
use crate::{bitcoin, monero};
use libp2p::core::ProtocolName;
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use serde::{Deserialize, Serialize};
@ -39,6 +40,8 @@ pub struct Response {
pub type Behaviour = RequestResponse<CborCodec<SpotPriceProtocol, Request, Response>>;
pub type Message = RequestResponseMessage<Request, Response>;
/// Constructs a new instance of the `spot-price` behaviour to be used by Alice.
///
/// Alice only supports inbound connections, i.e. providing spot prices for BTC

23
swap/src/network/swarm.rs Normal file
View File

@ -0,0 +1,23 @@
use crate::network::transport;
use crate::seed::Seed;
use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::Swarm;
pub fn new<B>(seed: &Seed) -> Result<Swarm<B>>
where
B: NetworkBehaviour + Default,
{
let identity = seed.derive_libp2p_identity();
let behaviour = B::default();
let transport = transport::build(&identity)?;
let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id())
.executor(Box::new(|f| {
tokio::spawn(f);
}))
.build();
Ok(swarm)
}

View File

@ -0,0 +1,44 @@
use crate::monero;
use crate::network::cbor_request_response::CborCodec;
use libp2p::core::ProtocolName;
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use serde::{Deserialize, Serialize};
pub type OutEvent = RequestResponseEvent<Request, ()>;
#[derive(Debug, Clone, Copy, Default)]
pub struct TransferProofProtocol;
impl ProtocolName for TransferProofProtocol {
fn protocol_name(&self) -> &[u8] {
b"/comit/xmr/btc/transfer_proof/1.0.0"
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request {
pub tx_lock_proof: monero::TransferProof,
}
pub type Behaviour = RequestResponse<CborCodec<TransferProofProtocol, Request, ()>>;
pub type Message = RequestResponseMessage<Request, ()>;
pub fn alice() -> Behaviour {
Behaviour::new(
CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
RequestResponseConfig::default(),
)
}
pub fn bob() -> Behaviour {
Behaviour::new(
CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Inbound)],
RequestResponseConfig::default(),
)
}

View File

@ -11,16 +11,13 @@ pub use self::event_loop::{EventLoop, EventLoopHandle};
pub use self::execution_setup::Message1;
pub use self::state::*;
pub use self::swap::{run, run_until};
pub use self::transfer_proof::TransferProof;
pub use execution_setup::Message3;
mod behaviour;
mod encrypted_signature;
pub mod event_loop;
mod execution_setup;
pub mod state;
pub mod swap;
mod transfer_proof;
pub struct Swap {
pub state: AliceState,

View File

@ -1,22 +1,18 @@
use crate::env::Config;
use crate::network::quote::BidQuote;
use crate::network::{peer_tracker, quote, spot_price};
use crate::protocol::alice::{
encrypted_signature, execution_setup, transfer_proof, State0, State3, TransferProof,
};
use crate::protocol::bob::EncryptedSignature;
use crate::network::{encrypted_signature, quote, spot_price, transfer_proof};
use crate::protocol::alice::{execution_setup, State0, State3};
use crate::{bitcoin, monero};
use anyhow::{anyhow, Error, Result};
use libp2p::request_response::{RequestResponseMessage, ResponseChannel};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use rand::{CryptoRng, RngCore};
use tracing::debug;
#[derive(Debug)]
pub enum OutEvent {
ConnectionEstablished(PeerId),
SpotPriceRequested {
msg: spot_price::Request,
request: spot_price::Request,
channel: ResponseChannel<spot_price::Response>,
peer: PeerId,
},
@ -29,8 +25,8 @@ pub enum OutEvent {
state3: Box<State3>,
},
TransferProofAcknowledged(PeerId),
EncryptedSignature {
msg: Box<EncryptedSignature>,
EncryptedSignatureReceived {
msg: Box<encrypted_signature::Request>,
channel: ResponseChannel<()>,
peer: PeerId,
},
@ -41,72 +37,111 @@ pub enum OutEvent {
},
}
impl From<peer_tracker::OutEvent> for OutEvent {
fn from(event: peer_tracker::OutEvent) -> Self {
match event {
peer_tracker::OutEvent::ConnectionEstablished(id) => {
OutEvent::ConnectionEstablished(id)
}
impl OutEvent {
fn unexpected_request(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected request received"),
}
}
fn unexpected_response(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected response received"),
}
}
}
impl From<(PeerId, quote::Message)> for OutEvent {
fn from((peer, message): (PeerId, quote::Message)) -> Self {
match message {
quote::Message::Request { channel, .. } => OutEvent::QuoteRequested { channel, peer },
quote::Message::Response { .. } => OutEvent::unexpected_response(peer),
}
}
}
impl From<(PeerId, spot_price::Message)> for OutEvent {
fn from((peer, message): (PeerId, spot_price::Message)) -> Self {
match message {
spot_price::Message::Request {
request, channel, ..
} => OutEvent::SpotPriceRequested {
request,
channel,
peer,
},
spot_price::Message::Response { .. } => OutEvent::unexpected_response(peer),
}
}
}
impl From<(PeerId, transfer_proof::Message)> for OutEvent {
fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self {
match message {
transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer),
transfer_proof::Message::Response { .. } => OutEvent::TransferProofAcknowledged(peer),
}
}
}
impl From<(PeerId, encrypted_signature::Message)> for OutEvent {
fn from((peer, message): (PeerId, encrypted_signature::Message)) -> Self {
match message {
encrypted_signature::Message::Request {
request, channel, ..
} => OutEvent::EncryptedSignatureReceived {
msg: Box::new(request),
channel,
peer,
},
encrypted_signature::Message::Response { .. } => OutEvent::unexpected_response(peer),
}
}
}
impl From<spot_price::OutEvent> for OutEvent {
fn from(event: spot_price::OutEvent) -> Self {
match event {
spot_price::OutEvent::Message {
peer,
message:
RequestResponseMessage::Request {
channel,
request: msg,
..
},
} => OutEvent::SpotPriceRequested { msg, channel, peer },
spot_price::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out spot prices, not receive them"),
peer,
},
spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
},
spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
},
}
map_rr_event_to_outevent(event)
}
}
impl From<quote::OutEvent> for OutEvent {
fn from(event: quote::OutEvent) -> Self {
match event {
quote::OutEvent::Message {
peer,
message: RequestResponseMessage::Request { channel, .. },
} => OutEvent::QuoteRequested { channel, peer },
quote::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out quotes, not receive them"),
peer,
},
quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
},
quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
},
}
map_rr_event_to_outevent(event)
}
}
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
map_rr_event_to_outevent(event)
}
}
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
map_rr_event_to_outevent(event)
}
}
fn map_rr_event_to_outevent<I, O>(event: RequestResponseEvent<I, O>) -> OutEvent
where
OutEvent: From<(PeerId, RequestResponseMessage<I, O>)>,
{
use RequestResponseEvent::*;
match event {
Message { message, peer, .. } => OutEvent::from((peer, message)),
ResponseSent { .. } => OutEvent::ResponseSent,
InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("protocol failed due to {:?}", error),
peer,
},
OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("protocol failed due to {:?}", error),
peer,
},
}
}
@ -126,43 +161,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
}
}
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
use crate::protocol::alice::transfer_proof::OutEvent::*;
match event {
Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer),
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Transfer Proof"),
},
}
}
}
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
use crate::protocol::alice::encrypted_signature::OutEvent::*;
match event {
MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature {
msg: Box::new(msg),
channel,
peer,
},
AckSent => OutEvent::ResponseSent,
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Encrypted Signature"),
},
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
pt: peer_tracker::Behaviour,
quote: quote::Behaviour,
spot_price: spot_price::Behaviour,
execution_setup: execution_setup::Behaviour,
@ -173,12 +176,11 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
Self {
pt: Default::default(),
quote: quote::alice(),
spot_price: spot_price::alice(),
execution_setup: Default::default(),
transfer_proof: Default::default(),
encrypted_signature: Default::default(),
transfer_proof: transfer_proof::alice(),
encrypted_signature: encrypted_signature::alice(),
}
}
}
@ -231,12 +233,25 @@ impl Behaviour {
}
/// Send Transfer Proof to Bob.
pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) {
self.transfer_proof.send(bob, msg);
debug!("Sent Transfer Proof");
///
/// Fails and returns the transfer proof if we are currently not connected
/// to this peer.
pub fn send_transfer_proof(
&mut self,
bob: PeerId,
msg: transfer_proof::Request,
) -> Result<(), transfer_proof::Request> {
if !self.transfer_proof.is_connected(&bob) {
return Err(msg);
}
self.transfer_proof.send_request(&bob, msg);
debug!("Sending Transfer Proof");
Ok(())
}
pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
self.encrypted_signature.send_ack(channel)
pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) {
let _ = self.encrypted_signature.send_response(channel, ());
}
}

View File

@ -1,95 +0,0 @@
use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT};
use crate::protocol::bob::EncryptedSignature;
use anyhow::{anyhow, Error, Result};
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::{NetworkBehaviour, PeerId};
use std::time::Duration;
use tracing::debug;
#[derive(Debug)]
pub enum OutEvent {
MsgReceived {
msg: EncryptedSignature,
channel: ResponseChannel<()>,
peer: PeerId,
},
AckSent,
Failure {
peer: PeerId,
error: Error,
},
}
/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
/// signature from Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
}
impl Behaviour {
pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
self.rr
.send_response(channel, ())
.map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err))
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)],
config,
),
}
}
}
impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
debug!("Received encrypted signature from {}", peer);
OutEvent::MsgReceived {
msg: request,
channel,
peer,
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
peer,
} => OutEvent::Failure {
peer,
error: anyhow!("Alice should not get a Response"),
},
RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Inbound failure: {:?}", error),
},
RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Outbound failure: {:?}", error),
},
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
}
}
}

View File

@ -3,16 +3,14 @@ use crate::database::Database;
use crate::env::Config;
use crate::monero::BalanceTooLow;
use crate::network::quote::BidQuote;
use crate::network::{spot_price, transport, TokioExecutor};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof};
use crate::protocol::bob::EncryptedSignature;
use crate::seed::Seed;
use crate::network::{encrypted_signature, spot_price, transfer_proof};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap};
use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::core::Multiaddr;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng;
use std::collections::HashMap;
@ -25,7 +23,6 @@ use uuid::Uuid;
#[allow(missing_debug_implementations)]
pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>,
peer_id: PeerId,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
@ -34,22 +31,25 @@ pub struct EventLoop<RS> {
max_buy: bitcoin::Amount,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<EncryptedSignature>>,
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<encrypted_signature::Request>>,
/// Stores a list of futures, waiting for transfer proof which will be sent
/// to the given peer.
send_transfer_proof: FuturesUnordered<BoxFuture<'static, Result<(PeerId, TransferProof)>>>,
send_transfer_proof:
FuturesUnordered<BoxFuture<'static, Result<(PeerId, transfer_proof::Request)>>>,
swap_sender: mpsc::Sender<Swap>,
/// Tracks [`transfer_proof::Request`]s which could not yet be sent because
/// we are currently disconnected from the peer.
buffered_transfer_proofs: HashMap<PeerId, transfer_proof::Request>,
}
impl<LR> EventLoop<LR>
where
LR: LatestRate,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
listen_address: Multiaddr,
seed: Seed,
swarm: Swarm<Behaviour>,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
@ -57,25 +57,10 @@ where
latest_rate: LR,
max_buy: bitcoin::Amount,
) -> Result<(Self, mpsc::Receiver<Swap>)> {
let identity = seed.derive_libp2p_identity();
let behaviour = Behaviour::default();
let transport = transport::build(&identity)?;
let peer_id = PeerId::from(identity.public());
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
Swarm::listen_on(&mut swarm, listen_address.clone())
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
let swap_channel = MpscChannels::default();
let event_loop = EventLoop {
swarm,
peer_id,
env_config,
bitcoin_wallet,
monero_wallet,
@ -85,12 +70,13 @@ where
max_buy,
recv_encrypted_signature: Default::default(),
send_transfer_proof: Default::default(),
buffered_transfer_proofs: Default::default(),
};
Ok((event_loop, swap_channel.receiver))
}
pub fn peer_id(&self) -> PeerId {
self.peer_id
*Swarm::local_peer_id(&self.swarm)
}
pub async fn run(mut self) {
@ -100,13 +86,10 @@ where
loop {
tokio::select! {
swarm_event = self.swarm.next() => {
swarm_event = self.swarm.next_event() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => {
debug!("Connection Established with {}", alice);
}
OutEvent::SpotPriceRequested { msg, channel, peer } => {
let btc = msg.btc;
SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => {
let btc = request.btc;
let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await {
Ok(xmr) => xmr,
Err(e) => {
@ -131,7 +114,7 @@ where
}
}
}
OutEvent::QuoteRequested { channel, peer } => {
SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => {
let quote = match self.make_quote(self.max_buy).await {
Ok(quote) => quote,
Err(e) => {
@ -149,13 +132,13 @@ where
}
}
}
OutEvent::ExecutionSetupDone{bob_peer_id, state3} => {
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
}
OutEvent::TransferProofAcknowledged(peer) => {
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => {
trace!(%peer, "Bob acknowledged transfer proof");
}
OutEvent::EncryptedSignature{ msg, channel, peer } => {
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
match self.recv_encrypted_signature.remove(&peer) {
Some(sender) => {
// this failing just means the receiver is no longer interested ...
@ -166,20 +149,48 @@ where
}
}
if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) {
error!("Failed to send Encrypted Signature ack: {:?}", error);
}
self.swarm.send_encrypted_signature_ack(channel);
}
OutEvent::ResponseSent => {}
OutEvent::Failure {peer, error} => {
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
error!(%peer, "Communication error: {:#}", error);
}
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) {
tracing::debug!(%peer, "Found buffered transfer proof for peer");
self.swarm
.send_transfer_proof(peer, transfer_proof)
.expect("must be able to send transfer proof after connection was established");
}
}
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
tracing::warn!(%address, "Failed to set up connection with peer: {}", error);
}
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => {
match cause {
Some(error) => {
tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error);
},
None => {
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
}
}
}
_ => {}
}
},
next_transfer_proof = self.send_transfer_proof.next() => {
match next_transfer_proof {
Some(Ok((peer, transfer_proof))) => {
self.swarm.send_transfer_proof(peer, transfer_proof);
let result = self.swarm.send_transfer_proof(peer, transfer_proof);
if let Err(transfer_proof) = result {
tracing::warn!(%peer, "No active connection to peer, buffering transfer proof");
self.buffered_transfer_proofs.insert(peer, transfer_proof);
}
},
Some(Err(_)) => {
tracing::debug!("A swap stopped without sending a transfer proof");
@ -306,8 +317,8 @@ impl LatestRate for kraken::RateUpdateStream {
#[derive(Debug)]
pub struct EventLoopHandle {
recv_encrypted_signature: Option<oneshot::Receiver<EncryptedSignature>>,
send_transfer_proof: Option<oneshot::Sender<TransferProof>>,
recv_encrypted_signature: Option<oneshot::Receiver<encrypted_signature::Request>>,
send_transfer_proof: Option<oneshot::Sender<transfer_proof::Request>>,
}
impl EventLoopHandle {
@ -327,7 +338,7 @@ impl EventLoopHandle {
.send_transfer_proof
.take()
.context("Transfer proof was already sent")?
.send(TransferProof { tx_lock_proof: msg })
.send(transfer_proof::Request { tx_lock_proof: msg })
.is_err()
{
bail!("Failed to send transfer proof, receiver no longer listening?")

View File

@ -1,5 +1,5 @@
use crate::bitcoin::{EncryptedSignature, Signature};
use crate::network::request_response::BUF_SIZE;
use crate::network::cbor_request_response::BUF_SIZE;
use crate::protocol::alice::{State0, State3};
use crate::protocol::bob::{Message0, Message2, Message4};
use crate::{bitcoin, monero};

View File

@ -1,82 +0,0 @@
use crate::monero;
use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT};
use anyhow::{anyhow, Error};
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use libp2p::{NetworkBehaviour, PeerId};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransferProof {
pub tx_lock_proof: monero::TransferProof,
}
#[derive(Debug)]
pub enum OutEvent {
Acknowledged(PeerId),
Failure { peer: PeerId, error: Error },
}
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
/// Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
}
impl Behaviour {
pub fn send(&mut self, bob: PeerId, msg: TransferProof) {
let _id = self.rr.send_request(&bob, msg);
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Outbound)],
config,
),
}
}
}
impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
peer,
} => OutEvent::Failure {
peer,
error: anyhow!("Alice should never get a transfer proof request from Bob"),
},
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
peer,
} => OutEvent::Acknowledged(peer),
RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Inbound failure: {:?}", error),
},
RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Outbound failure: {:?}", error),
},
RequestResponseEvent::ResponseSent { peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Alice should not send a response"),
},
}
}
}

View File

@ -1,35 +1,31 @@
use crate::database::Database;
use crate::env::Config;
use crate::network::{peer_tracker, spot_price};
use crate::protocol::alice::TransferProof;
use crate::network::{encrypted_signature, spot_price};
use crate::protocol::bob;
use crate::{bitcoin, monero};
use anyhow::{anyhow, Error, Result};
pub use execution_setup::{Message0, Message2, Message4};
use libp2p::core::Multiaddr;
use libp2p::request_response::{RequestResponseMessage, ResponseChannel};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use std::sync::Arc;
use tracing::debug;
use uuid::Uuid;
pub use self::cancel::cancel;
pub use self::encrypted_signature::EncryptedSignature;
pub use self::event_loop::{EventLoop, EventLoopHandle};
pub use self::refund::refund;
pub use self::state::*;
pub use self::swap::{run, run_until};
use crate::network::quote;
use crate::network::quote::BidQuote;
use crate::network::{quote, transfer_proof};
pub mod cancel;
mod encrypted_signature;
pub mod event_loop;
mod execution_setup;
pub mod refund;
pub mod state;
pub mod swap;
mod transfer_proof;
pub struct Swap {
pub state: BobState,
@ -117,8 +113,8 @@ pub enum OutEvent {
QuoteReceived(BidQuote),
SpotPriceReceived(spot_price::Response),
ExecutionSetupDone(Result<Box<State2>>),
TransferProof {
msg: Box<TransferProof>,
TransferProofReceived {
msg: Box<transfer_proof::Request>,
channel: ResponseChannel<()>,
},
EncryptedSignatureAcknowledged,
@ -126,11 +122,54 @@ pub enum OutEvent {
CommunicationError(Error),
}
impl From<peer_tracker::OutEvent> for OutEvent {
fn from(event: peer_tracker::OutEvent) -> Self {
match event {
peer_tracker::OutEvent::ConnectionEstablished(id) => {
OutEvent::ConnectionEstablished(id)
impl OutEvent {
fn unexpected_request() -> OutEvent {
OutEvent::CommunicationError(anyhow!("Unexpected request received"))
}
fn unexpected_response() -> OutEvent {
OutEvent::CommunicationError(anyhow!("Unexpected response received"))
}
}
impl From<quote::Message> for OutEvent {
fn from(message: quote::Message) -> Self {
match message {
quote::Message::Request { .. } => OutEvent::unexpected_request(),
quote::Message::Response { response, .. } => OutEvent::QuoteReceived(response),
}
}
}
impl From<spot_price::Message> for OutEvent {
fn from(message: spot_price::Message) -> Self {
match message {
spot_price::Message::Request { .. } => OutEvent::unexpected_request(),
spot_price::Message::Response { response, .. } => OutEvent::SpotPriceReceived(response),
}
}
}
impl From<transfer_proof::Message> for OutEvent {
fn from(message: transfer_proof::Message) -> Self {
match message {
transfer_proof::Message::Request {
request, channel, ..
} => OutEvent::TransferProofReceived {
msg: Box::new(request),
channel,
},
transfer_proof::Message::Response { .. } => OutEvent::unexpected_response(),
}
}
}
impl From<encrypted_signature::Message> for OutEvent {
fn from(message: encrypted_signature::Message) -> Self {
match message {
encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(),
encrypted_signature::Message::Response { .. } => {
OutEvent::EncryptedSignatureAcknowledged
}
}
}
@ -138,65 +177,47 @@ impl From<peer_tracker::OutEvent> for OutEvent {
impl From<spot_price::OutEvent> for OutEvent {
fn from(event: spot_price::OutEvent) -> Self {
match event {
spot_price::OutEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => OutEvent::SpotPriceReceived(response),
spot_price::OutEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => OutEvent::CommunicationError(anyhow!(
"Bob is only meant to receive spot prices, not hand them out"
)),
spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
spot_price::OutEvent::InboundFailure { peer, error, .. } => {
OutEvent::CommunicationError(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
peer,
error
))
}
spot_price::OutEvent::OutboundFailure { peer, error, .. } => {
OutEvent::CommunicationError(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
peer,
error
))
}
}
map_rr_event_to_outevent(event)
}
}
impl From<quote::OutEvent> for OutEvent {
fn from(event: quote::OutEvent) -> Self {
match event {
quote::OutEvent::Message {
message: RequestResponseMessage::Response { response, .. },
..
} => OutEvent::QuoteReceived(response),
quote::OutEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => OutEvent::CommunicationError(anyhow!(
"Bob is only meant to receive quotes, not hand them out"
)),
quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
quote::OutEvent::InboundFailure { peer, error, .. } => {
OutEvent::CommunicationError(anyhow!(
"quote protocol with peer {} failed due to {:?}",
peer,
error
))
}
quote::OutEvent::OutboundFailure { peer, error, .. } => {
OutEvent::CommunicationError(anyhow!(
"quote protocol with peer {} failed due to {:?}",
peer,
error
))
}
}
map_rr_event_to_outevent(event)
}
}
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
map_rr_event_to_outevent(event)
}
}
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
map_rr_event_to_outevent(event)
}
}
fn map_rr_event_to_outevent<I, O>(event: RequestResponseEvent<I, O>) -> OutEvent
where
OutEvent: From<RequestResponseMessage<I, O>>,
{
use RequestResponseEvent::*;
match event {
Message { message, .. } => OutEvent::from(message),
ResponseSent { .. } => OutEvent::ResponseSent,
InboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!(
"protocol with peer {} failed due to {:?}",
peer,
error
)),
OutboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!(
"protocol with peer {} failed due to {:?}",
peer,
error
)),
}
}
@ -208,40 +229,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
}
}
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
use transfer_proof::OutEvent::*;
match event {
MsgReceived { msg, channel } => OutEvent::TransferProof {
msg: Box::new(msg),
channel,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => {
OutEvent::CommunicationError(err.context("Failure with Transfer Proof"))
}
}
}
}
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
use encrypted_signature::OutEvent::*;
match event {
Acknowledged => OutEvent::EncryptedSignatureAcknowledged,
Failure(err) => {
OutEvent::CommunicationError(err.context("Failure with Encrypted Signature"))
}
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
pt: peer_tracker::Behaviour,
quote: quote::Behaviour,
spot_price: spot_price::Behaviour,
execution_setup: execution_setup::Behaviour,
@ -252,12 +244,11 @@ pub struct Behaviour {
impl Default for Behaviour {
fn default() -> Self {
Self {
pt: Default::default(),
quote: quote::bob(),
spot_price: spot_price::bob(),
execution_setup: Default::default(),
transfer_proof: Default::default(),
encrypted_signature: Default::default(),
transfer_proof: transfer_proof::bob(),
encrypted_signature: encrypted_signature::bob(),
}
}
}
@ -286,13 +277,16 @@ impl Behaviour {
alice: PeerId,
tx_redeem_encsig: bitcoin::EncryptedSignature,
) {
let msg = EncryptedSignature { tx_redeem_encsig };
self.encrypted_signature.send(alice, msg);
let msg = encrypted_signature::Request { tx_redeem_encsig };
self.encrypted_signature.send_request(&alice, msg);
debug!("Encrypted signature sent");
}
/// Add a known address for the given peer
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.pt.add_address(peer_id, address)
self.quote.add_address(&peer_id, address.clone());
self.spot_price.add_address(&peer_id, address.clone());
self.transfer_proof.add_address(&peer_id, address.clone());
self.encrypted_signature.add_address(&peer_id, address);
}
}

View File

@ -1,74 +0,0 @@
use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT};
use anyhow::{anyhow, Error};
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage,
};
use libp2p::{NetworkBehaviour, PeerId};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncryptedSignature {
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
}
#[derive(Debug)]
pub enum OutEvent {
Acknowledged,
Failure(Error),
}
/// A `NetworkBehaviour` that represents sending encrypted signature to Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<CborCodec<EncryptedSignatureProtocol, EncryptedSignature, ()>>,
}
impl Behaviour {
pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) {
let _id = self.rr.send_request(&alice, msg);
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
CborCodec::default(),
vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)],
config,
),
}
}
}
impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
fn from(event: RequestResponseEvent<EncryptedSignature, ()>) -> Self {
match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
} => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")),
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Acknowledged,
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!(
"Bob does not send the encrypted signature response to Alice"
)),
}
}
}

View File

@ -1,34 +1,168 @@
use crate::bitcoin::EncryptedSignature;
use crate::network::quote::BidQuote;
use crate::network::{spot_price, transport, TokioExecutor};
use crate::protocol::alice::TransferProof;
use crate::network::{spot_price, transfer_proof};
use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero};
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, Result};
use futures::FutureExt;
use libp2p::core::Multiaddr;
use libp2p::PeerId;
use std::convert::Infallible;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, error, trace};
use tracing::{debug, error};
#[derive(Debug)]
pub struct Channels<T> {
sender: Sender<T>,
receiver: Receiver<T>,
#[allow(missing_debug_implementations)]
pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId,
request_spot_price: Receiver<spot_price::Request>,
recv_spot_price: Sender<spot_price::Response>,
start_execution_setup: Receiver<State0>,
done_execution_setup: Sender<Result<State2>>,
recv_transfer_proof: Sender<transfer_proof::Request>,
conn_established: Sender<PeerId>,
send_encrypted_signature: Receiver<EncryptedSignature>,
request_quote: Receiver<()>,
recv_quote: Sender<BidQuote>,
}
impl<T> Channels<T> {
pub fn new() -> Channels<T> {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
Channels { sender, receiver }
impl EventLoop {
pub fn new(
swarm: Swarm<Behaviour>,
alice_peer_id: PeerId,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<(Self, EventLoopHandle)> {
let start_execution_setup = Channels::new();
let done_execution_setup = Channels::new();
let recv_transfer_proof = Channels::new();
let conn_established = Channels::new();
let send_encrypted_signature = Channels::new();
let request_spot_price = Channels::new();
let recv_spot_price = Channels::new();
let request_quote = Channels::new();
let recv_quote = Channels::new();
let event_loop = EventLoop {
swarm,
alice_peer_id,
bitcoin_wallet,
start_execution_setup: start_execution_setup.receiver,
done_execution_setup: done_execution_setup.sender,
recv_transfer_proof: recv_transfer_proof.sender,
conn_established: conn_established.sender,
send_encrypted_signature: send_encrypted_signature.receiver,
request_spot_price: request_spot_price.receiver,
recv_spot_price: recv_spot_price.sender,
request_quote: request_quote.receiver,
recv_quote: recv_quote.sender,
};
let handle = EventLoopHandle {
start_execution_setup: start_execution_setup.sender,
done_execution_setup: done_execution_setup.receiver,
recv_transfer_proof: recv_transfer_proof.receiver,
send_encrypted_signature: send_encrypted_signature.sender,
request_spot_price: request_spot_price.sender,
recv_spot_price: recv_spot_price.receiver,
request_quote: request_quote.sender,
recv_quote: recv_quote.receiver,
};
Ok((event_loop, handle))
}
}
impl<T> Default for Channels<T> {
fn default() -> Self {
Self::new()
pub async fn run(mut self) {
let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id);
loop {
tokio::select! {
swarm_event = self.swarm.next_event().fuse() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => {
let _ = self.conn_established.send(peer_id).await;
}
SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => {
let _ = self.recv_spot_price.send(msg).await;
}
SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => {
let _ = self.recv_quote.send(msg).await;
}
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => {
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
}
SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => {
let _ = self.recv_transfer_proof.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) {
error!("Failed to send Transfer Proof ack: {:?}", error);
}
}
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => {
debug!("Alice acknowledged encrypted signature");
}
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {
}
SwarmEvent::Behaviour(OutEvent::CommunicationError(error)) => {
tracing::warn!("Communication error: {:#}", error);
return;
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => {
tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address());
}
SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => {
tracing::debug!("Dialling Alice at {}", peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } if peer_id == self.alice_peer_id && num_established == 0 => {
match cause {
Some(error) => {
tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error);
},
None => {
// no error means the disconnection was requested
tracing::info!("Successfully closed connection to Alice");
return;
}
}
match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) {
Ok(()) => {},
Err(e) => {
tracing::warn!("Failed to re-dial Alice: {}", e);
return;
}
}
}
SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => {
tracing::warn!("Failed to dial Alice at {}: {}", address, error);
}
_ => {}
}
},
spot_price_request = self.request_spot_price.recv().fuse() => {
if let Some(request) = spot_price_request {
self.swarm.request_spot_price(self.alice_peer_id, request);
}
},
quote_request = self.request_quote.recv().fuse() => {
if quote_request.is_some() {
self.swarm.request_quote(self.alice_peer_id);
}
},
option = self.start_execution_setup.recv().fuse() => {
if let Some(state0) = option {
let _ = self
.swarm
.start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone());
}
},
encrypted_signature = self.send_encrypted_signature.recv().fuse() => {
if let Some(tx_redeem_encsig) = encrypted_signature {
self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig);
}
}
}
}
}
}
@ -36,9 +170,7 @@ impl<T> Default for Channels<T> {
pub struct EventLoopHandle {
start_execution_setup: Sender<State0>,
done_execution_setup: Receiver<Result<State2>>,
recv_transfer_proof: Receiver<TransferProof>,
conn_established: Receiver<PeerId>,
dial_alice: Sender<()>,
recv_transfer_proof: Receiver<transfer_proof::Request>,
send_encrypted_signature: Sender<EncryptedSignature>,
request_spot_price: Sender<spot_price::Request>,
recv_spot_price: Receiver<spot_price::Response>,
@ -56,26 +188,13 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed to setup execution with Alice"))?
}
pub async fn recv_transfer_proof(&mut self) -> Result<TransferProof> {
pub async fn recv_transfer_proof(&mut self) -> Result<transfer_proof::Request> {
self.recv_transfer_proof
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice"))
}
/// Dials other party and wait for the connection to be established.
/// Do nothing if we are already connected
pub async fn dial(&mut self) -> Result<()> {
let _ = self.dial_alice.send(()).await?;
self.conn_established
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?;
Ok(())
}
pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> {
let _ = self
.request_spot_price
@ -113,156 +232,21 @@ impl EventLoopHandle {
}
}
#[allow(missing_debug_implementations)]
pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId,
request_spot_price: Receiver<spot_price::Request>,
recv_spot_price: Sender<spot_price::Response>,
start_execution_setup: Receiver<State0>,
done_execution_setup: Sender<Result<State2>>,
recv_transfer_proof: Sender<TransferProof>,
dial_alice: Receiver<()>,
conn_established: Sender<PeerId>,
send_encrypted_signature: Receiver<EncryptedSignature>,
request_quote: Receiver<()>,
recv_quote: Sender<BidQuote>,
#[derive(Debug)]
struct Channels<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
impl EventLoop {
pub fn new(
identity: &libp2p::core::identity::Keypair,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<(Self, EventLoopHandle)> {
let behaviour = Behaviour::default();
let transport = transport::build(identity)?;
let mut swarm = libp2p::swarm::SwarmBuilder::new(
transport,
behaviour,
identity.public().into_peer_id(),
)
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
swarm.add_address(alice_peer_id, alice_addr);
let start_execution_setup = Channels::new();
let done_execution_setup = Channels::new();
let recv_transfer_proof = Channels::new();
let dial_alice = Channels::new();
let conn_established = Channels::new();
let send_encrypted_signature = Channels::new();
let request_spot_price = Channels::new();
let recv_spot_price = Channels::new();
let request_quote = Channels::new();
let recv_quote = Channels::new();
let event_loop = EventLoop {
swarm,
alice_peer_id,
bitcoin_wallet,
start_execution_setup: start_execution_setup.receiver,
done_execution_setup: done_execution_setup.sender,
recv_transfer_proof: recv_transfer_proof.sender,
conn_established: conn_established.sender,
dial_alice: dial_alice.receiver,
send_encrypted_signature: send_encrypted_signature.receiver,
request_spot_price: request_spot_price.receiver,
recv_spot_price: recv_spot_price.sender,
request_quote: request_quote.receiver,
recv_quote: recv_quote.sender,
};
let handle = EventLoopHandle {
start_execution_setup: start_execution_setup.sender,
done_execution_setup: done_execution_setup.receiver,
recv_transfer_proof: recv_transfer_proof.receiver,
conn_established: conn_established.receiver,
dial_alice: dial_alice.sender,
send_encrypted_signature: send_encrypted_signature.sender,
request_spot_price: request_spot_price.sender,
recv_spot_price: recv_spot_price.receiver,
request_quote: request_quote.sender,
recv_quote: recv_quote.receiver,
};
Ok((event_loop, handle))
}
pub async fn run(mut self) -> Result<Infallible> {
loop {
tokio::select! {
swarm_event = self.swarm.next().fuse() => {
match swarm_event {
OutEvent::ConnectionEstablished(peer_id) => {
let _ = self.conn_established.send(peer_id).await;
}
OutEvent::SpotPriceReceived(msg) => {
let _ = self.recv_spot_price.send(msg).await;
},
OutEvent::QuoteReceived(msg) => {
let _ = self.recv_quote.send(msg).await;
},
OutEvent::ExecutionSetupDone(res) => {
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
}
OutEvent::TransferProof{ msg, channel }=> {
let _ = self.recv_transfer_proof.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.transfer_proof.send_ack(channel) {
error!("Failed to send Transfer Proof ack: {:?}", error);
}
}
OutEvent::EncryptedSignatureAcknowledged => {
debug!("Alice acknowledged encrypted signature");
}
OutEvent::ResponseSent => {}
OutEvent::CommunicationError(err) => {
bail!("Communication error: {:#}", err)
}
}
},
option = self.dial_alice.recv().fuse() => {
if option.is_some() {
let peer_id = self.alice_peer_id;
if self.swarm.pt.is_connected(&peer_id) {
trace!("Already connected to Alice at {}", peer_id);
let _ = self.conn_established.send(peer_id).await;
} else {
debug!("Dialing alice at {}", peer_id);
libp2p::Swarm::dial(&mut self.swarm, &peer_id).context("Failed to dial alice")?;
}
}
},
spot_price_request = self.request_spot_price.recv().fuse() => {
if let Some(request) = spot_price_request {
self.swarm.request_spot_price(self.alice_peer_id, request);
}
},
quote_request = self.request_quote.recv().fuse() => {
if quote_request.is_some() {
self.swarm.request_quote(self.alice_peer_id);
}
},
option = self.start_execution_setup.recv().fuse() => {
if let Some(state0) = option {
let _ = self
.swarm
.start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone());
}
},
encrypted_signature = self.send_encrypted_signature.recv().fuse() => {
if let Some(tx_redeem_encsig) = encrypted_signature {
self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig);
}
}
}
}
impl<T> Channels<T> {
fn new() -> Channels<T> {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
Channels { sender, receiver }
}
}
impl<T> Default for Channels<T> {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,5 +1,5 @@
use crate::bitcoin::Signature;
use crate::network::request_response::BUF_SIZE;
use crate::network::cbor_request_response::BUF_SIZE;
use crate::protocol::alice::{Message1, Message3};
use crate::protocol::bob::{State0, State2};
use anyhow::{Context, Error, Result};

View File

@ -7,7 +7,7 @@ use crate::monero::wallet::WatchRequest;
use crate::monero::{monero_private_key, TransferProof};
use crate::monero_ext::ScalarExt;
use crate::protocol::alice::{Message1, Message3};
use crate::protocol::bob::{EncryptedSignature, Message0, Message2, Message4};
use crate::protocol::bob::{Message0, Message2, Message4};
use crate::protocol::CROSS_CURVE_PROOF_SYSTEM;
use anyhow::{anyhow, bail, Context, Result};
use ecdsa_fun::adaptor::{Adaptor, HashTranscript};
@ -404,12 +404,6 @@ pub struct State4 {
}
impl State4 {
pub fn next_message(&self) -> EncryptedSignature {
EncryptedSignature {
tx_redeem_encsig: self.tx_redeem_encsig(),
}
}
pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature {
let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address);
self.b.encsign(self.S_a_bitcoin, tx_redeem.digest())

View File

@ -69,8 +69,6 @@ async fn run_until_internal(
BobState::Started { btc_amount } => {
let bitcoin_refund_address = bitcoin_wallet.new_address().await?;
event_loop_handle.dial().await?;
let state2 = request_price_and_setup(
btc_amount,
&mut event_loop_handle,
@ -82,8 +80,6 @@ async fn run_until_internal(
BobState::ExecutionSetupDone(state2)
}
BobState::ExecutionSetupDone(state2) => {
// Do not lock Bitcoin if not connected to Alice.
event_loop_handle.dial().await?;
// Alice and Bob have exchanged info
let (state3, tx_lock) = state2.lock_btc().await?;
let signed_tx = bitcoin_wallet
@ -98,8 +94,6 @@ async fn run_until_internal(
// Watch for Alice to Lock Xmr or for cancel timelock to elapse
BobState::BtcLocked(state3) => {
if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? {
event_loop_handle.dial().await?;
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref());
@ -140,8 +134,6 @@ async fn run_until_internal(
monero_wallet_restore_blockheight,
} => {
if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? {
event_loop_handle.dial().await?;
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
select! {
@ -166,7 +158,6 @@ async fn run_until_internal(
}
BobState::XmrLocked(state) => {
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? {
event_loop_handle.dial().await?;
// Alice has locked Xmr
// Bob sends Alice his key

View File

@ -1,85 +0,0 @@
use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT};
use crate::protocol::alice::TransferProof;
use anyhow::{anyhow, Error, Result};
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::NetworkBehaviour;
use std::time::Duration;
use tracing::debug;
#[derive(Debug)]
pub enum OutEvent {
MsgReceived {
msg: TransferProof,
channel: ResponseChannel<()>,
},
AckSent,
Failure(Error),
}
/// A `NetworkBehaviour` that represents receiving the transfer proof from
/// Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
rr: RequestResponse<CborCodec<TransferProofProtocol, TransferProof, ()>>,
}
impl Behaviour {
pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> {
self.rr
.send_response(channel, ())
.map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err))
}
}
impl Default for Behaviour {
fn default() -> Self {
let timeout = Duration::from_secs(TIMEOUT);
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
CborCodec::default(),
vec![(TransferProofProtocol, ProtocolSupport::Inbound)],
config,
),
}
}
}
impl From<RequestResponseEvent<TransferProof, ()>> for OutEvent {
fn from(event: RequestResponseEvent<TransferProof, ()>) -> Self {
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request, channel, ..
},
..
} => {
debug!("Received Transfer Proof from {}", peer);
OutEvent::MsgReceived {
msg: request,
channel,
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!("Bob should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
}
}
}

View File

@ -15,7 +15,7 @@ use std::path::{Path, PathBuf};
pub const SEED_LENGTH: usize = 32;
#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Eq, PartialEq)]
pub struct Seed([u8; SEED_LENGTH]);
impl Seed {

View File

@ -0,0 +1,34 @@
pub mod testutils;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
use testutils::bob_run_until::is_xmr_locked;
use testutils::SlowCancelConfig;
#[tokio::test]
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?;
ctx.assert_bob_redeemed(bob_state).await;
let alice_state = alice_swap.await??;
ctx.assert_alice_redeemed(alice_state).await;
Ok(())
})
.await;
}

View File

@ -7,9 +7,8 @@ use bitcoin_harness::{BitcoindRpcApi, Client};
use futures::Future;
use get_port::get_port;
use libp2p::core::Multiaddr;
use libp2p::PeerId;
use libp2p::{PeerId, Swarm};
use monero_harness::{image, Monero};
use std::convert::Infallible;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
@ -17,6 +16,7 @@ use swap::asb::FixedRate;
use swap::bitcoin::{CancelTimelock, PunishTimelock};
use swap::database::Database;
use swap::env::{Config, GetConfig};
use swap::network::swarm;
use swap::protocol::alice::{AliceState, Swap};
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
@ -43,7 +43,6 @@ pub struct StartingBalances {
pub btc: bitcoin::Amount,
}
#[derive(Clone)]
struct BobParams {
seed: Seed,
db_path: PathBuf,
@ -71,16 +70,14 @@ impl BobParams {
}
pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
bob::EventLoop::new(
&self.seed.derive_libp2p_identity(),
self.alice_peer_id,
self.alice_address.clone(),
self.bitcoin_wallet.clone(),
)
let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?;
swarm.add_address(self.alice_peer_id, self.alice_address.clone());
bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone())
}
}
pub struct BobEventLoopJoinHandle(JoinHandle<Result<Infallible>>);
pub struct BobEventLoopJoinHandle(JoinHandle<()>);
impl BobEventLoopJoinHandle {
pub fn abort(&self) {
@ -385,9 +382,11 @@ where
)
.await;
let mut alice_swarm = swarm::new::<alice::Behaviour>(&alice_seed).unwrap();
Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap();
let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new(
alice_listen_address.clone(),
alice_seed,
alice_swarm,
env_config,
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),