387: Improve the resilience of the network layer r=thomaseizinger a=thomaseizinger

We improve the resilience in two ways:

1. Use a timeout on Bob's side for the execution-setup.
2. Use the `bmrng` library to model the communication between Alice and Bob.

See commit messages for details.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
bors[bot] 2021-04-06 06:20:30 +00:00 committed by GitHub
commit e0b859bb1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 402 additions and 399 deletions

51
Cargo.lock generated
View File

@ -480,6 +480,17 @@ dependencies = [
"byte-tools", "byte-tools",
] ]
[[package]]
name = "bmrng"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ba7ad917af2fc43efa0b20d2bf3b2c1bd1090fa2a6b8c73847458c8335dea2b"
dependencies = [
"futures",
"loom",
"tokio 1.4.0",
]
[[package]] [[package]]
name = "bs58" name = "bs58"
version = "0.4.0" version = "0.4.0"
@ -1180,6 +1191,19 @@ dependencies = [
"byteorder", "byteorder",
] ]
[[package]]
name = "generator"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6"
dependencies = [
"cc",
"libc",
"log 0.4.14",
"rustversion",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.12.4" version = "0.12.4"
@ -1908,6 +1932,20 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
[[package]]
name = "loom"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d44c73b4636e497b4917eb21c33539efa3816741a2d3ff26c6316f1b529481a4"
dependencies = [
"cfg-if 1.0.0",
"futures-util",
"generator",
"scoped-tls",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "lru" name = "lru"
version = "0.6.5" version = "0.6.5"
@ -3013,6 +3051,12 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "rustversion"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
[[package]] [[package]]
name = "rw-stream-sink" name = "rw-stream-sink"
version = "0.2.1" version = "0.2.1"
@ -3036,6 +3080,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"
[[package]]
name = "scoped-tls"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -3521,6 +3571,7 @@ dependencies = [
"big-bytes", "big-bytes",
"bitcoin", "bitcoin",
"bitcoin-harness", "bitcoin-harness",
"bmrng",
"config", "config",
"conquer-once", "conquer-once",
"curve25519-dalek", "curve25519-dalek",

View File

@ -18,6 +18,7 @@ base64 = "0.13"
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", rev = "e5ecc7f" } bdk = { git = "https://github.com/bitcoindevkit/bdk.git", rev = "e5ecc7f" }
big-bytes = "1" big-bytes = "1"
bitcoin = { version = "0.26", features = ["rand", "use-serde"] } bitcoin = { version = "0.26", features = ["rand", "use-serde"] }
bmrng = "0.5"
config = { version = "0.11", default-features = false, features = ["toml"] } config = { version = "0.11", default-features = false, features = ["toml"] }
conquer-once = "0.3" conquer-once = "0.3"
curve25519-dalek = "3" curve25519-dalek = "3"

View File

@ -239,9 +239,6 @@ async fn main() -> Result<()> {
"The Cancel Transaction cannot be published yet, \ "The Cancel Transaction cannot be published yet, \
because the timelock has not expired. Please try again later." because the timelock has not expired. Please try again later."
), ),
Err(bob::cancel::Error::CancelTxAlreadyPublished) => {
warn!("The Cancel Transaction has already been published.")
}
} }
} }
Command::Refund { Command::Refund {

View File

@ -1,13 +1,11 @@
use crate::env::Config;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; use crate::network::{encrypted_signature, quote, spot_price, transfer_proof};
use crate::protocol::alice::{execution_setup, State0, State3}; use crate::protocol::alice::{execution_setup, State3};
use crate::{bitcoin, monero}; use anyhow::{anyhow, Error};
use anyhow::{anyhow, Error, Result}; use libp2p::request_response::{
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p::{NetworkBehaviour, PeerId}; use libp2p::{NetworkBehaviour, PeerId};
use rand::{CryptoRng, RngCore};
use tracing::debug;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
@ -24,7 +22,10 @@ pub enum OutEvent {
bob_peer_id: PeerId, bob_peer_id: PeerId,
state3: Box<State3>, state3: Box<State3>,
}, },
TransferProofAcknowledged(PeerId), TransferProofAcknowledged {
peer: PeerId,
id: RequestId,
},
EncryptedSignatureReceived { EncryptedSignatureReceived {
msg: Box<encrypted_signature::Request>, msg: Box<encrypted_signature::Request>,
channel: ResponseChannel<()>, channel: ResponseChannel<()>,
@ -81,7 +82,12 @@ impl From<(PeerId, transfer_proof::Message)> for OutEvent {
fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self { fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self {
match message { match message {
transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer), transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer),
transfer_proof::Message::Response { .. } => OutEvent::TransferProofAcknowledged(peer), transfer_proof::Message::Response { request_id, .. } => {
OutEvent::TransferProofAcknowledged {
peer,
id: request_id,
}
}
} }
} }
} }
@ -166,11 +172,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
quote: quote::Behaviour, pub quote: quote::Behaviour,
spot_price: spot_price::Behaviour, pub spot_price: spot_price::Behaviour,
execution_setup: execution_setup::Behaviour, pub execution_setup: execution_setup::Behaviour,
transfer_proof: transfer_proof::Behaviour, pub transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour,
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -184,74 +190,3 @@ impl Default for Behaviour {
} }
} }
} }
impl Behaviour {
pub fn send_quote(
&mut self,
channel: ResponseChannel<BidQuote>,
response: BidQuote,
) -> Result<()> {
self.quote
.send_response(channel, response)
.map_err(|_| anyhow!("Failed to respond with quote"))?;
Ok(())
}
pub fn send_spot_price(
&mut self,
channel: ResponseChannel<spot_price::Response>,
response: spot_price::Response,
) -> Result<()> {
self.spot_price
.send_response(channel, response)
.map_err(|_| anyhow!("Failed to respond with spot price"))?;
Ok(())
}
pub async fn start_execution_setup(
&mut self,
peer: PeerId,
btc: bitcoin::Amount,
xmr: monero::Amount,
env_config: Config,
bitcoin_wallet: &bitcoin::Wallet,
rng: &mut (impl RngCore + CryptoRng),
) -> Result<()> {
let state0 = State0::new(btc, xmr, env_config, bitcoin_wallet, rng).await?;
tracing::info!(
%peer,
"Starting execution setup to sell {} for {}",
xmr, btc,
);
self.execution_setup.run(peer, state0);
Ok(())
}
/// Send Transfer Proof to Bob.
///
/// Fails and returns the transfer proof if we are currently not connected
/// to this peer.
pub fn send_transfer_proof(
&mut self,
bob: PeerId,
msg: transfer_proof::Request,
) -> Result<(), transfer_proof::Request> {
if !self.transfer_proof.is_connected(&bob) {
return Err(msg);
}
self.transfer_proof.send_request(&bob, msg);
debug!("Sending Transfer Proof");
Ok(())
}
pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) {
let _ = self.encrypted_signature.send_response(channel, ());
}
}

View File

@ -4,22 +4,32 @@ use crate::env::Config;
use crate::monero::BalanceTooLow; use crate::monero::BalanceTooLow;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::network::{encrypted_signature, spot_price, transfer_proof};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap};
use crate::{bitcoin, kraken, monero}; use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use futures::future; use futures::future;
use futures::future::{BoxFuture, FutureExt}; use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible; use std::convert::Infallible;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use tracing::{debug, error, trace};
use uuid::Uuid; use uuid::Uuid;
/// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and
/// `Responder`.
///
/// When this future resolves, the `transfer_proof::Request` shall be sent to
/// the peer identified by the `PeerId`. Once the request has been acknowledged
/// by the peer, i.e. a `()` response has been received, the `Responder` shall
/// be used to let the original sender know about the successful transfer.
type OutgoingTransferProof =
BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop<RS> { pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
@ -30,18 +40,22 @@ pub struct EventLoop<RS> {
latest_rate: RS, latest_rate: RS,
max_buy: bitcoin::Amount, max_buy: bitcoin::Amount,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<encrypted_signature::Request>>,
/// Stores a list of futures, waiting for transfer proof which will be sent
/// to the given peer.
send_transfer_proof:
FuturesUnordered<BoxFuture<'static, Result<(PeerId, transfer_proof::Request)>>>,
swap_sender: mpsc::Sender<Swap>, swap_sender: mpsc::Sender<Swap>,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature:
HashMap<PeerId, bmrng::RequestSender<encrypted_signature::Request, ()>>,
inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>,
send_transfer_proof: FuturesUnordered<OutgoingTransferProof>,
/// Tracks [`transfer_proof::Request`]s which could not yet be sent because /// Tracks [`transfer_proof::Request`]s which could not yet be sent because
/// we are currently disconnected from the peer. /// we are currently disconnected from the peer.
buffered_transfer_proofs: HashMap<PeerId, transfer_proof::Request>, buffered_transfer_proofs: HashMap<PeerId, (transfer_proof::Request, bmrng::Responder<()>)>,
/// Tracks [`transfer_proof::Request`]s which are currently inflight and
/// awaiting an acknowledgement.
inflight_transfer_proofs: HashMap<RequestId, bmrng::Responder<()>>,
} }
impl<LR> EventLoop<LR> impl<LR> EventLoop<LR>
@ -69,8 +83,10 @@ where
swap_sender: swap_channel.sender, swap_sender: swap_channel.sender,
max_buy, max_buy,
recv_encrypted_signature: Default::default(), recv_encrypted_signature: Default::default(),
inflight_encrypted_signatures: Default::default(),
send_transfer_proof: Default::default(), send_transfer_proof: Default::default(),
buffered_transfer_proofs: Default::default(), buffered_transfer_proofs: Default::default(),
inflight_transfer_proofs: Default::default(),
}; };
Ok((event_loop, swap_channel.receiver)) Ok((event_loop, swap_channel.receiver))
} }
@ -80,9 +96,11 @@ where
} }
pub async fn run(mut self) { pub async fn run(mut self) {
// ensure that the send_transfer_proof stream is NEVER empty, otherwise it will // ensure that these streams are NEVER empty, otherwise it will
// terminate forever. // terminate forever.
self.send_transfer_proof.push(future::pending().boxed()); self.send_transfer_proof.push(future::pending().boxed());
self.inflight_encrypted_signatures
.push(future::pending().boxed());
let unfinished_swaps = match self.db.unfinished_alice() { let unfinished_swaps = match self.db.unfinished_alice() {
Ok(unfinished_swaps) => unfinished_swaps, Ok(unfinished_swaps) => unfinished_swaps,
@ -130,77 +148,87 @@ where
let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await {
Ok(xmr) => xmr, Ok(xmr) => xmr,
Err(e) => { Err(e) => {
tracing::warn!(%peer, "failed to produce spot price for {}: {:#}", btc, e); tracing::warn!(%peer, "Failed to produce spot price for {}: {:#}", btc, e);
continue; continue;
} }
}; };
match self.swarm.send_spot_price(channel, spot_price::Response { xmr }) { match self.swarm.spot_price.send_response(channel, spot_price::Response { xmr }) {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(_) => {
// if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug // if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug
debug!(%peer, "failed to respond with spot price: {:#}", e); tracing::debug!(%peer, "Failed to respond with spot price");
continue; continue;
} }
} }
match self.swarm.start_execution_setup(peer, btc, xmr, self.env_config, self.bitcoin_wallet.as_ref(), &mut OsRng).await { let state0 = match State0::new(btc, xmr, self.env_config, self.bitcoin_wallet.as_ref(), &mut OsRng).await {
Ok(_) => {}, Ok(state) => state,
Err(e) => { Err(e) => {
tracing::warn!(%peer, "failed to start execution setup: {:#}", e); tracing::warn!(%peer, "Failed to make State0 for execution setup: {:#}", e);
} continue;
} }
};
self.swarm.execution_setup.run(peer, state0);
} }
SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => {
let quote = match self.make_quote(self.max_buy).await { let quote = match self.make_quote(self.max_buy).await {
Ok(quote) => quote, Ok(quote) => quote,
Err(e) => { Err(e) => {
tracing::warn!(%peer, "failed to make quote: {:#}", e); tracing::warn!(%peer, "Failed to make quote: {:#}", e);
continue; continue;
} }
}; };
match self.swarm.send_quote(channel, quote) { if self.swarm.quote.send_response(channel, quote).is_err() {
Ok(_) => {}, tracing::debug!(%peer, "Failed to respond with quote");
Err(e) => {
// if we can't respond, the peer probably just disconnected so it is not a huge deal, only log this on debug
debug!(%peer, "failed to respond with quote: {:#}", e);
continue;
}
} }
} }
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
} }
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => { SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => {
trace!(%peer, "Bob acknowledged transfer proof"); tracing::trace!(%peer, "Bob acknowledged transfer proof");
if let Some(responder) = self.inflight_transfer_proofs.remove(&id) {
let _ = responder.respond(());
}
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
match self.recv_encrypted_signature.remove(&peer) { let sender = match self.recv_encrypted_signature.remove(&peer) {
Some(sender) => { Some(sender) => sender,
// this failing just means the receiver is no longer interested ...
let _ = sender.send(*msg);
},
None => { None => {
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?") tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?");
} continue;
} }
};
self.swarm.send_encrypted_signature_ack(channel); let mut responder = match sender.send(*msg).await {
Ok(responder) => responder,
Err(_) => {
tracing::warn!(%peer, "Failed to relay encrypted signature to swap");
continue;
}
};
self.inflight_encrypted_signatures.push(async move {
let _ = responder.recv().await;
channel
}.boxed());
} }
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {} SwarmEvent::Behaviour(OutEvent::ResponseSent) => {}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
error!(%peer, "Communication error: {:#}", error); tracing::error!(%peer, "Communication error: {:#}", error);
} }
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) { if let Some((transfer_proof, responder)) = self.buffered_transfer_proofs.remove(&peer) {
tracing::debug!(%peer, "Found buffered transfer proof for peer"); tracing::debug!(%peer, "Found buffered transfer proof for peer");
self.swarm let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
.send_transfer_proof(peer, transfer_proof) self.inflight_transfer_proofs.insert(id, responder);
.expect("must be able to send transfer proof after connection was established");
} }
} }
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
@ -221,13 +249,15 @@ where
}, },
next_transfer_proof = self.send_transfer_proof.next() => { next_transfer_proof = self.send_transfer_proof.next() => {
match next_transfer_proof { match next_transfer_proof {
Some(Ok((peer, transfer_proof))) => { Some(Ok((peer, transfer_proof, responder))) => {
let result = self.swarm.send_transfer_proof(peer, transfer_proof); if !self.swarm.transfer_proof.is_connected(&peer) {
if let Err(transfer_proof) = result {
tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); tracing::warn!(%peer, "No active connection to peer, buffering transfer proof");
self.buffered_transfer_proofs.insert(peer, transfer_proof); self.buffered_transfer_proofs.insert(peer, (transfer_proof, responder));
continue;
} }
let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder);
}, },
Some(Err(_)) => { Some(Err(_)) => {
tracing::debug!("A swap stopped without sending a transfer proof"); tracing::debug!("A swap stopped without sending a transfer proof");
@ -237,6 +267,9 @@ where
} }
} }
} }
Some(response_channel) = self.inflight_encrypted_signatures.next() => {
let _ = self.swarm.encrypted_signature.send_response(response_channel, ());
}
} }
} }
} }
@ -319,23 +352,25 @@ where
/// Create a new [`EventLoopHandle`] that is scoped for communication with /// Create a new [`EventLoopHandle`] that is scoped for communication with
/// the given peer. /// the given peer.
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle {
let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel(); // we deliberately don't put timeouts on these channels because the swap always
let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel(); // races these futures against a timelock
let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1);
let encrypted_signature = bmrng::channel(1);
self.recv_encrypted_signature self.recv_encrypted_signature
.insert(peer, recv_enc_sig_sender); .insert(peer, encrypted_signature.0);
self.send_transfer_proof.push( self.send_transfer_proof.push(
async move { async move {
let transfer_proof = send_transfer_proof_receiver.await?; let (request, responder) = transfer_proof_receiver.recv().await?;
Ok((peer, transfer_proof)) Ok((peer, request, responder))
} }
.boxed(), .boxed(),
); );
EventLoopHandle { EventLoopHandle {
recv_encrypted_signature: Some(recv_enc_sig_receiver), recv_encrypted_signature: Some(encrypted_signature.1),
send_transfer_proof: Some(send_transfer_proof_sender), send_transfer_proof: Some(transfer_proof_sender),
} }
} }
} }
@ -364,32 +399,33 @@ impl LatestRate for kraken::RateUpdateStream {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: Option<oneshot::Receiver<encrypted_signature::Request>>, recv_encrypted_signature: Option<bmrng::RequestReceiver<encrypted_signature::Request, ()>>,
send_transfer_proof: Option<oneshot::Sender<transfer_proof::Request>>, send_transfer_proof: Option<bmrng::RequestSender<transfer_proof::Request, ()>>,
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> { pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> {
let signature = self let (request, responder) = self
.recv_encrypted_signature .recv_encrypted_signature
.take() .take()
.context("Encrypted signature was already received")? .context("Encrypted signature was already received")?
.await? .recv()
.tx_redeem_encsig; .await?;
Ok(signature) responder
.respond(())
.context("Failed to acknowledge receipt of encrypted signature")?;
Ok(request.tx_redeem_encsig)
} }
pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> {
if self self.send_transfer_proof
.send_transfer_proof
.take() .take()
.context("Transfer proof was already sent")? .context("Transfer proof was already sent")?
.send(transfer_proof::Request { tx_lock_proof: msg }) .send_receive(transfer_proof::Request { tx_lock_proof: msg })
.is_err() .await
{ .context("Failed to send transfer proof")?;
bail!("Failed to send transfer proof, receiver no longer listening?")
}
Ok(()) Ok(())
} }

View File

@ -116,29 +116,32 @@ async fn next_state(
state3, state3,
}, },
}, },
AliceState::XmrLocked { AliceState::XmrLocked {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
state3, state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? { } => {
ExpiredTimelocks::None => { let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
event_loop_handle
.send_transfer_proof(transfer_proof.clone()) tokio::select! {
.await?; result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => {
result?;
XmrLockTransferProofSent { XmrLockTransferProofSent {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
state3, state3,
} }
} },
_ => AliceState::CancelTimelockExpired { _ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => {
AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
state3, state3,
}, }
}, }
}
}
AliceState::XmrLockTransferProofSent { AliceState::XmrLockTransferProofSent {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
@ -146,9 +149,9 @@ async fn next_state(
} => { } => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
select! { select! {
biased; // make sure the cancel timelock expiry future is polled first
_ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => { _ = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock) => {
AliceState::CancelTimelockExpired { AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
@ -168,13 +171,6 @@ async fn next_state(
} }
} }
} }
_ => AliceState::CancelTimelockExpired {
monero_wallet_restore_blockheight,
transfer_proof,
state3,
},
}
}
AliceState::EncSigLearned { AliceState::EncSigLearned {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,

View File

@ -5,10 +5,11 @@ use crate::protocol::bob;
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
use libp2p::core::Multiaddr; use libp2p::core::Multiaddr;
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::request_response::{
RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p::{NetworkBehaviour, PeerId}; use libp2p::{NetworkBehaviour, PeerId};
use std::sync::Arc; use std::sync::Arc;
use tracing::debug;
use uuid::Uuid; use uuid::Uuid;
pub use self::cancel::cancel; pub use self::cancel::cancel;
@ -108,15 +109,22 @@ impl Builder {
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), QuoteReceived {
QuoteReceived(BidQuote), id: RequestId,
SpotPriceReceived(spot_price::Response), response: BidQuote,
ExecutionSetupDone(Result<Box<State2>>), },
SpotPriceReceived {
id: RequestId,
response: spot_price::Response,
},
ExecutionSetupDone(Box<Result<State2>>),
TransferProofReceived { TransferProofReceived {
msg: Box<transfer_proof::Request>, msg: Box<transfer_proof::Request>,
channel: ResponseChannel<()>, channel: ResponseChannel<()>,
}, },
EncryptedSignatureAcknowledged, EncryptedSignatureAcknowledged {
id: RequestId,
},
ResponseSent, // Same variant is used for all messages as no processing is done ResponseSent, // Same variant is used for all messages as no processing is done
CommunicationError(Error), CommunicationError(Error),
} }
@ -135,7 +143,13 @@ impl From<quote::Message> for OutEvent {
fn from(message: quote::Message) -> Self { fn from(message: quote::Message) -> Self {
match message { match message {
quote::Message::Request { .. } => OutEvent::unexpected_request(), quote::Message::Request { .. } => OutEvent::unexpected_request(),
quote::Message::Response { response, .. } => OutEvent::QuoteReceived(response), quote::Message::Response {
response,
request_id,
} => OutEvent::QuoteReceived {
id: request_id,
response,
},
} }
} }
} }
@ -144,7 +158,13 @@ impl From<spot_price::Message> for OutEvent {
fn from(message: spot_price::Message) -> Self { fn from(message: spot_price::Message) -> Self {
match message { match message {
spot_price::Message::Request { .. } => OutEvent::unexpected_request(), spot_price::Message::Request { .. } => OutEvent::unexpected_request(),
spot_price::Message::Response { response, .. } => OutEvent::SpotPriceReceived(response), spot_price::Message::Response {
response,
request_id,
} => OutEvent::SpotPriceReceived {
id: request_id,
response,
},
} }
} }
} }
@ -167,8 +187,8 @@ impl From<encrypted_signature::Message> for OutEvent {
fn from(message: encrypted_signature::Message) -> Self { fn from(message: encrypted_signature::Message) -> Self {
match message { match message {
encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(), encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(),
encrypted_signature::Message::Response { .. } => { encrypted_signature::Message::Response { request_id, .. } => {
OutEvent::EncryptedSignatureAcknowledged OutEvent::EncryptedSignatureAcknowledged { id: request_id }
} }
} }
} }
@ -223,7 +243,7 @@ where
impl From<execution_setup::OutEvent> for OutEvent { impl From<execution_setup::OutEvent> for OutEvent {
fn from(event: execution_setup::OutEvent) -> Self { fn from(event: execution_setup::OutEvent) -> Self {
match event { match event {
execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(Box::new(res)),
} }
} }
} }
@ -233,11 +253,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Behaviour { pub struct Behaviour {
quote: quote::Behaviour, pub quote: quote::Behaviour,
spot_price: spot_price::Behaviour, pub spot_price: spot_price::Behaviour,
execution_setup: execution_setup::Behaviour, pub execution_setup: execution_setup::Behaviour,
transfer_proof: transfer_proof::Behaviour, pub transfer_proof: transfer_proof::Behaviour,
encrypted_signature: encrypted_signature::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour,
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -253,34 +273,6 @@ impl Default for Behaviour {
} }
impl Behaviour { impl Behaviour {
pub fn request_quote(&mut self, alice: PeerId) {
let _ = self.quote.send_request(&alice, ());
}
pub fn request_spot_price(&mut self, alice: PeerId, request: spot_price::Request) {
let _ = self.spot_price.send_request(&alice, request);
}
pub fn start_execution_setup(
&mut self,
alice_peer_id: PeerId,
state0: State0,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) {
self.execution_setup
.run(alice_peer_id, state0, bitcoin_wallet);
}
pub fn send_encrypted_signature(
&mut self,
alice: PeerId,
tx_redeem_encsig: bitcoin::EncryptedSignature,
) {
let msg = encrypted_signature::Request { tx_redeem_encsig };
self.encrypted_signature.send_request(&alice, msg);
debug!("Encrypted signature sent");
}
/// Add a known address for the given peer /// Add a known address for the given peer
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.quote.add_address(&peer_id, address.clone()); self.quote.add_address(&peer_id, address.clone());

View File

@ -9,8 +9,6 @@ use uuid::Uuid;
pub enum Error { pub enum Error {
#[error("The cancel timelock has not expired yet.")] #[error("The cancel timelock has not expired yet.")]
CancelTimelockNotExpiredYet, CancelTimelockNotExpiredYet,
#[error("The cancel transaction has already been published.")]
CancelTxAlreadyPublished,
} }
pub async fn cancel( pub async fn cancel(
@ -40,25 +38,23 @@ pub async fn cancel(
), ),
}; };
tracing::info!(%swap_id, "Manually cancelling swap");
if !force { if !force {
tracing::debug!(%swap_id, "Checking if cancel timelock is expired");
if let ExpiredTimelocks::None = state6.expired_timelock(bitcoin_wallet.as_ref()).await? { if let ExpiredTimelocks::None = state6.expired_timelock(bitcoin_wallet.as_ref()).await? {
return Ok(Err(Error::CancelTimelockNotExpiredYet)); return Ok(Err(Error::CancelTimelockNotExpiredYet));
} }
if state6
.check_for_tx_cancel(bitcoin_wallet.as_ref())
.await
.is_ok()
{
let state = BobState::BtcCancelled(state6);
let db_state = state.into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?;
return Ok(Err(Error::CancelTxAlreadyPublished));
}
} }
let txid = state6.submit_tx_cancel(bitcoin_wallet.as_ref()).await?; let txid = if let Ok(tx) = state6.check_for_tx_cancel(bitcoin_wallet.as_ref()).await {
tracing::debug!(%swap_id, "Cancel transaction has already been published");
tx.txid()
} else {
state6.submit_tx_cancel(bitcoin_wallet.as_ref()).await?
};
let state = BobState::BtcCancelled(state6); let state = BobState::BtcCancelled(state6);
let db_state = state.clone().into(); let db_state = state.clone().into();

View File

@ -1,30 +1,48 @@
use crate::bitcoin::EncryptedSignature; use crate::bitcoin::EncryptedSignature;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{spot_price, transfer_proof}; use crate::network::{encrypted_signature, spot_price, transfer_proof};
use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{anyhow, Result}; use anyhow::{Context, Result};
use futures::FutureExt; use futures::future::{BoxFuture, OptionFuture};
use futures::{FutureExt, StreamExt};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender}; use std::time::Duration;
use tracing::{debug, error};
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
request_spot_price: Receiver<spot_price::Request>,
recv_spot_price: Sender<spot_price::Response>, // these streams represents outgoing requests that we have to make
start_execution_setup: Receiver<State0>, quote_requests: bmrng::RequestReceiverStream<(), BidQuote>,
done_execution_setup: Sender<Result<State2>>, spot_price_requests: bmrng::RequestReceiverStream<spot_price::Request, spot_price::Response>,
recv_transfer_proof: Sender<transfer_proof::Request>, encrypted_signature_requests: bmrng::RequestReceiverStream<encrypted_signature::Request, ()>,
conn_established: Sender<PeerId>, execution_setup_requests: bmrng::RequestReceiverStream<State0, Result<State2>>,
send_encrypted_signature: Receiver<EncryptedSignature>,
request_quote: Receiver<()>, // these represents requests that are currently in-flight.
recv_quote: Sender<BidQuote>, // once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response.
inflight_spot_price_requests: HashMap<RequestId, bmrng::Responder<spot_price::Response>>,
inflight_quote_requests: HashMap<RequestId, bmrng::Responder<BidQuote>>,
inflight_encrypted_signature_requests: HashMap<RequestId, bmrng::Responder<()>>,
inflight_execution_setup: Option<bmrng::Responder<Result<State2>>>,
/// The sender we will use to relay incoming transfer proofs.
transfer_proof: bmrng::RequestSender<transfer_proof::Request, ()>,
/// The future representing the successful handling of an incoming transfer
/// proof.
///
/// Once we've sent a transfer proof to the ongoing swap, this future waits
/// until the swap took it "out" of the `EventLoopHandle`. As this future
/// resolves, we use the `ResponseChannel` returned from it to send an ACK
/// to Alice that we have successfully processed the transfer proof.
pending_transfer_proof: OptionFuture<BoxFuture<'static, ResponseChannel<()>>>,
} }
impl EventLoop { impl EventLoop {
@ -33,40 +51,34 @@ impl EventLoop {
alice_peer_id: PeerId, alice_peer_id: PeerId,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Result<(Self, EventLoopHandle)> { ) -> Result<(Self, EventLoopHandle)> {
let start_execution_setup = Channels::new(); let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let done_execution_setup = Channels::new(); let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let recv_transfer_proof = Channels::new(); let encrypted_signature = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let conn_established = Channels::new(); let spot_price = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let send_encrypted_signature = Channels::new(); let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let request_spot_price = Channels::new();
let recv_spot_price = Channels::new();
let request_quote = Channels::new();
let recv_quote = Channels::new();
let event_loop = EventLoop { let event_loop = EventLoop {
swarm, swarm,
alice_peer_id, alice_peer_id,
bitcoin_wallet, bitcoin_wallet,
start_execution_setup: start_execution_setup.receiver, execution_setup_requests: execution_setup.1.into(),
done_execution_setup: done_execution_setup.sender, transfer_proof: transfer_proof.0,
recv_transfer_proof: recv_transfer_proof.sender, encrypted_signature_requests: encrypted_signature.1.into(),
conn_established: conn_established.sender, spot_price_requests: spot_price.1.into(),
send_encrypted_signature: send_encrypted_signature.receiver, quote_requests: quote.1.into(),
request_spot_price: request_spot_price.receiver, inflight_spot_price_requests: HashMap::default(),
recv_spot_price: recv_spot_price.sender, inflight_quote_requests: HashMap::default(),
request_quote: request_quote.receiver, inflight_execution_setup: None,
recv_quote: recv_quote.sender, inflight_encrypted_signature_requests: HashMap::default(),
pending_transfer_proof: OptionFuture::from(None),
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
start_execution_setup: start_execution_setup.sender, execution_setup: execution_setup.0,
done_execution_setup: done_execution_setup.receiver, transfer_proof: transfer_proof.1,
recv_transfer_proof: recv_transfer_proof.receiver, encrypted_signature: encrypted_signature.0,
send_encrypted_signature: send_encrypted_signature.sender, spot_price: spot_price.0,
request_spot_price: request_spot_price.sender, quote: quote.0,
recv_spot_price: recv_spot_price.receiver,
request_quote: request_quote.sender,
recv_quote: recv_quote.receiver,
}; };
Ok((event_loop, handle)) Ok((event_loop, handle))
@ -76,30 +88,44 @@ impl EventLoop {
let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id);
loop { loop {
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
tokio::select! { tokio::select! {
swarm_event = self.swarm.next_event().fuse() => { swarm_event = self.swarm.next_event().fuse() => {
match swarm_event { match swarm_event {
SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => { SwarmEvent::Behaviour(OutEvent::SpotPriceReceived { id, response }) => {
let _ = self.conn_established.send(peer_id).await; if let Some(responder) = self.inflight_spot_price_requests.remove(&id) {
let _ = responder.respond(response);
} }
SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => {
let _ = self.recv_spot_price.send(msg).await;
} }
SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
let _ = self.recv_quote.send(msg).await; if let Some(responder) = self.inflight_quote_requests.remove(&id) {
let _ = responder.respond(response);
}
}
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(response)) => {
if let Some(responder) = self.inflight_execution_setup.take() {
let _ = responder.respond(*response);
} }
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => {
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
} }
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => {
let _ = self.recv_transfer_proof.send(*msg).await; let mut responder = match self.transfer_proof.send(*msg).await {
// Send back empty response so that the request/response protocol completes. Ok(responder) => responder,
if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { Err(_) => {
error!("Failed to send Transfer Proof ack: {:?}", error); tracing::warn!("Failed to pass on transfer proof");
continue;
} }
};
self.pending_transfer_proof = OptionFuture::from(Some(async move {
let _ = responder.recv().await;
channel
}.boxed()));
}
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => {
if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) {
let _ = responder.respond(());
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => {
debug!("Alice acknowledged encrypted signature");
} }
SwarmEvent::Behaviour(OutEvent::ResponseSent) => { SwarmEvent::Behaviour(OutEvent::ResponseSent) => {
@ -139,114 +165,86 @@ impl EventLoop {
_ => {} _ => {}
} }
}, },
spot_price_request = self.request_spot_price.recv().fuse() => {
if let Some(request) = spot_price_request { // Handle to-be-sent requests for all our network protocols.
self.swarm.request_spot_price(self.alice_peer_id, request); // Use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected.
} Some((request, responder)) = self.spot_price_requests.next().fuse(), if self.is_connected_to_alice() => {
let id = self.swarm.spot_price.send_request(&self.alice_peer_id, request);
self.inflight_spot_price_requests.insert(id, responder);
}, },
quote_request = self.request_quote.recv().fuse() => { Some(((), responder)) = self.quote_requests.next().fuse(), if self.is_connected_to_alice() => {
if quote_request.is_some() { let id = self.swarm.quote.send_request(&self.alice_peer_id, ());
self.swarm.request_quote(self.alice_peer_id); self.inflight_quote_requests.insert(id, responder);
}
}, },
option = self.start_execution_setup.recv().fuse() => { Some((request, responder)) = self.execution_setup_requests.next().fuse(), if self.is_connected_to_alice() => {
if let Some(state0) = option { self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone());
let _ = self self.inflight_execution_setup = Some(responder);
.swarm
.start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone());
}
}, },
encrypted_signature = self.send_encrypted_signature.recv().fuse() => { Some((request, responder)) = self.encrypted_signature_requests.next().fuse(), if self.is_connected_to_alice() => {
if let Some(tx_redeem_encsig) = encrypted_signature { let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request);
self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig); self.inflight_encrypted_signature_requests.insert(id, responder);
},
Some(response_channel) = &mut self.pending_transfer_proof => {
let _ = self.swarm.transfer_proof.send_response(response_channel, ());
self.pending_transfer_proof = OptionFuture::from(None);
} }
} }
} }
} }
fn is_connected_to_alice(&self) -> bool {
Swarm::is_connected(&self.swarm, &self.alice_peer_id)
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
start_execution_setup: Sender<State0>, execution_setup: bmrng::RequestSender<State0, Result<State2>>,
done_execution_setup: Receiver<Result<State2>>, transfer_proof: bmrng::RequestReceiver<transfer_proof::Request, ()>,
recv_transfer_proof: Receiver<transfer_proof::Request>, encrypted_signature: bmrng::RequestSender<encrypted_signature::Request, ()>,
send_encrypted_signature: Sender<EncryptedSignature>, spot_price: bmrng::RequestSender<spot_price::Request, spot_price::Response>,
request_spot_price: Sender<spot_price::Request>, quote: bmrng::RequestSender<(), BidQuote>,
recv_spot_price: Receiver<spot_price::Response>,
request_quote: Sender<()>,
recv_quote: Receiver<BidQuote>,
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn execution_setup(&mut self, state0: State0) -> Result<State2> { pub async fn execution_setup(&mut self, state0: State0) -> Result<State2> {
let _ = self.start_execution_setup.send(state0).await?; self.execution_setup.send_receive(state0).await?
self.done_execution_setup
.recv()
.await
.ok_or_else(|| anyhow!("Failed to setup execution with Alice"))?
} }
pub async fn recv_transfer_proof(&mut self) -> Result<transfer_proof::Request> { pub async fn recv_transfer_proof(&mut self) -> Result<transfer_proof::Request> {
self.recv_transfer_proof let (request, responder) = self
.transfer_proof
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) .context("Failed to receive transfer proof")?;
responder
.respond(())
.context("Failed to acknowledge receipt of transfer proof")?;
Ok(request)
} }
pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> { pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> {
let _ = self Ok(self
.request_spot_price .spot_price
.send(spot_price::Request { btc }) .send_receive(spot_price::Request { btc })
.await?; .await?
.xmr)
let response = self
.recv_spot_price
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive spot price from Alice"))?;
Ok(response.xmr)
} }
pub async fn request_quote(&mut self) -> Result<BidQuote> { pub async fn request_quote(&mut self) -> Result<BidQuote> {
let _ = self.request_quote.send(()).await?; Ok(self.quote.send_receive(()).await?)
let quote = self
.recv_quote
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive quote from Alice"))?;
Ok(quote)
} }
pub async fn send_encrypted_signature( pub async fn send_encrypted_signature(
&mut self, &mut self,
tx_redeem_encsig: EncryptedSignature, tx_redeem_encsig: EncryptedSignature,
) -> Result<()> { ) -> Result<()> {
self.send_encrypted_signature.send(tx_redeem_encsig).await?; Ok(self
.encrypted_signature
Ok(()) .send_receive(encrypted_signature::Request { tx_redeem_encsig })
} .await?)
}
#[derive(Debug)]
struct Channels<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
impl<T> Channels<T> {
fn new() -> Channels<T> {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
Channels { sender, receiver }
}
}
impl<T> Default for Channels<T> {
fn default() -> Self {
Self::new()
} }
} }

View File

@ -5,6 +5,7 @@ use anyhow::{Context, Error, Result};
use libp2p::PeerId; use libp2p::PeerId;
use libp2p_async_await::BehaviourOutEvent; use libp2p_async_await::BehaviourOutEvent;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
@ -42,8 +43,8 @@ impl Behaviour {
state0: State0, state0: State0,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>, bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) { ) {
self.inner self.inner.do_protocol_dialer(alice, move |mut substream| {
.do_protocol_dialer(alice, move |mut substream| async move { let protocol = async move {
tracing::debug!("Starting execution setup with {}", alice); tracing::debug!("Starting execution setup with {}", alice);
substream substream
@ -78,6 +79,9 @@ impl Behaviour {
.await?; .await?;
Ok(state2) Ok(state2)
};
async move { tokio::time::timeout(Duration::from_secs(10), protocol).await? }
}) })
} }
} }

View File

@ -8,7 +8,6 @@ use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use tokio::select; use tokio::select;
use tracing::{info, trace};
pub fn is_complete(state: &BobState) -> bool { pub fn is_complete(state: &BobState) -> bool {
matches!( matches!(
@ -59,7 +58,7 @@ async fn next_state(
env_config: &Config, env_config: &Config,
receive_monero_address: monero::Address, receive_monero_address: monero::Address,
) -> Result<BobState> { ) -> Result<BobState> {
trace!("Current state: {}", state); tracing::trace!("Current state: {}", state);
Ok(match state { Ok(match state {
BobState::Started { btc_amount } => { BobState::Started { btc_amount } => {
@ -159,8 +158,6 @@ async fn next_state(
BobState::XmrLocked(state) => { BobState::XmrLocked(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await; let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
info!("{:?}", tx_lock_status);
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
// Alice has locked Xmr // Alice has locked Xmr
// Bob sends Alice his key // Bob sends Alice his key