diff --git a/swap/src/protocol/alice/swap_setup.rs b/swap/src/protocol/alice/swap_setup.rs index 266c15c2..5ee2db5a 100644 --- a/swap/src/protocol/alice/swap_setup.rs +++ b/swap/src/protocol/alice/swap_setup.rs @@ -19,6 +19,7 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::future; use std::task::{Context, Poll}; +use std::time::Duration; use uuid::Uuid; use void::Void; @@ -34,8 +35,8 @@ pub enum OutEvent { }, Error { peer_id: PeerId, - error: Error - } + error: Error, + }, } #[derive(Debug)] @@ -96,7 +97,7 @@ impl From for alice::OutEvent { swap_id, state3: Box::new(state3), }, - OutEvent::Error { peer_id, error} => alice::OutEvent::Failure { + OutEvent::Error { peer_id, error } => alice::OutEvent::Failure { peer: peer_id, error: anyhow!(error), }, @@ -174,21 +175,20 @@ where fn inject_event(&mut self, peer_id: PeerId, connection: ConnectionId, event: HandlerOutEvent) { match event { HandlerOutEvent::Initiated(send_wallet_snapshot) => { - self.events.push_back(OutEvent::Initiated { send_wallet_snapshot }) + self.events.push_back(OutEvent::Initiated { + send_wallet_snapshot, + }) } HandlerOutEvent::Completed(Ok((swap_id, state3))) => { self.events.push_back(OutEvent::Completed { peer_id, swap_id, - state3 + state3, }) - }, + } HandlerOutEvent::Completed(Err(error)) => { - self.events.push_back(OutEvent::Error { - peer_id, - error - }) - }, + self.events.push_back(OutEvent::Error { peer_id, error }) + } } } @@ -267,6 +267,8 @@ pub struct Handler { latest_rate: LR, resume_only: bool, + + timeout: Duration, } impl Handler { @@ -285,6 +287,7 @@ impl Handler { env_config, latest_rate, resume_only, + timeout: Duration::from_secs(60), } } } @@ -329,120 +332,136 @@ where let latest_rate = self.latest_rate.latest_rate(); let env_config = self.env_config; - // TODO: Put a timeout on the whole future + let protocol = tokio::time::timeout(self.timeout, async move { + let request = read_cbor_message::(&mut substream) + .await + .map_err(|e| Error::Io(e))?; + let wallet_snapshot = sender + .send_receive(request.btc) + .await + .map_err(|e| Error::WalletSnapshotFailed(anyhow!(e)))?; + + // wrap all of these into another future so we can `return` from all the + // different blocks + let validate = async { + if resume_only { + return Err(Error::ResumeOnlyMode); + }; + + let blockchain_network = BlockchainNetwork { + bitcoin: env_config.bitcoin_network, + monero: env_config.monero_network, + }; + + if request.blockchain_network != blockchain_network { + return Err(Error::BlockchainNetworkMismatch { + cli: request.blockchain_network, + asb: blockchain_network, + }); + } + + let btc = request.btc; + + if btc < min_buy { + return Err(Error::AmountBelowMinimum { + min: min_buy, + buy: btc, + }); + } + + if btc > max_buy { + return Err(Error::AmountAboveMaximum { + max: max_buy, + buy: btc, + }); + } + + let rate = latest_rate.map_err(|e| Error::LatestRateFetchFailed(Box::new(e)))?; + let xmr = rate + .sell_quote(btc) + .map_err(|e| Error::SellQuoteCalculationFailed(e))?; + + if wallet_snapshot.balance < xmr + wallet_snapshot.lock_fee { + return Err(Error::BalanceTooLow { + balance: wallet_snapshot.balance, + buy: btc, + }); + } + + Ok(xmr) + }; + + let xmr = match validate.await { + Ok(xmr) => { + write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)) + .await + .map_err(|e| Error::Io(e))?; + + xmr + } + Err(e) => { + write_cbor_message( + &mut substream, + SpotPriceResponse::Error(e.to_error_response()), + ) + .await + .map_err(|e| Error::Io(e))?; + return Err(e); + } + }; + + let state0 = State0::new( + request.btc, + xmr, + env_config, + wallet_snapshot.redeem_address, + wallet_snapshot.punish_address, + wallet_snapshot.redeem_fee, + wallet_snapshot.punish_fee, + &mut rand::thread_rng(), + ); + + let message0 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message0") + .map_err(|e| Error::Io(e))?; + let (swap_id, state1) = state0.receive(message0).map_err(|e| Error::Io(e))?; + + write_cbor_message(&mut substream, state1.next_message()) + .await + .map_err(|e| Error::Io(e))?; + + let message2 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message2") + .map_err(|e| Error::Io(e))?; + let state2 = state1 + .receive(message2) + .context("Failed to receive Message2") + .map_err(|e| Error::Io(e))?; + + write_cbor_message(&mut substream, state2.next_message()) + .await + .map_err(|e| Error::Io(e))?; + + let message4 = read_cbor_message::(&mut substream) + .await + .context("Failed to deserialize message4") + .map_err(|e| Error::Io(e))?; + let state3 = state2 + .receive(message4) + .context("Failed to receive Message4") + .map_err(|e| Error::Io(e))?; + + Ok((swap_id, state3)) + }); + + let max_seconds = self.timeout.as_secs(); self.inbound_stream = OptionFuture::from(Some( async move { - let request = read_cbor_message::(&mut substream).await.map_err(|e| Error::Io(e))?; - let wallet_snapshot = sender.send_receive(request.btc).await.map_err(|e| Error::WalletSnapshotFailed(anyhow!(e)))?; - - // wrap all of these into another future so we can `return` from all the - // different blocks - let validate = async { - if resume_only { - return Err(Error::ResumeOnlyMode); - }; - - let blockchain_network = BlockchainNetwork { - bitcoin: env_config.bitcoin_network, - monero: env_config.monero_network, - }; - - if request.blockchain_network != blockchain_network { - return Err(Error::BlockchainNetworkMismatch { - cli: request.blockchain_network, - asb: blockchain_network, - }); - } - - let btc = request.btc; - - if btc < min_buy { - return Err(Error::AmountBelowMinimum { - min: min_buy, - buy: btc, - }); - } - - if btc > max_buy { - return Err(Error::AmountAboveMaximum { - max: max_buy, - buy: btc, - }); - } - - let rate = - latest_rate.map_err(|e| Error::LatestRateFetchFailed(Box::new(e)))?; - let xmr = rate - .sell_quote(btc) - .map_err(|e| Error::SellQuoteCalculationFailed(e))?; - - if wallet_snapshot.balance < xmr + wallet_snapshot.lock_fee { - return Err(Error::BalanceTooLow { - balance: wallet_snapshot.balance, - buy: btc, - }); - } - - Ok(xmr) - }; - - let xmr = match validate.await { - Ok(xmr) => { - write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)).await.map_err(|e| Error::Io(e))?; - - xmr - } - Err(e) => { - write_cbor_message( - &mut substream, - SpotPriceResponse::Error(e.to_error_response()), - ) - .await - .map_err(|e| Error::Io(e))?; - return Err(e); - } - }; - - let state0 = State0::new( - request.btc, - xmr, - env_config, - wallet_snapshot.redeem_address, - wallet_snapshot.punish_address, - wallet_snapshot.redeem_fee, - wallet_snapshot.punish_fee, - &mut rand::thread_rng(), - ); - - let message0 = read_cbor_message::(&mut substream) - .await - .context("Failed to deserialize message0") - .map_err(|e| Error::Io(e))?; - let (swap_id, state1) = state0.receive(message0).map_err(|e| Error::Io(e))?; - - write_cbor_message(&mut substream, state1.next_message()).await.map_err(|e| Error::Io(e))?; - - let message2 = read_cbor_message::(&mut substream) - .await - .context("Failed to deserialize message2") - .map_err(|e| Error::Io(e))?; - let state2 = state1 - .receive(message2) - .context("Failed to receive Message2") - .map_err(|e| Error::Io(e))?; - - write_cbor_message(&mut substream, state2.next_message()).await.map_err(|e| Error::Io(e))?; - - let message4 = read_cbor_message::(&mut substream) - .await - .context("Failed to deserialize message4") - .map_err(|e| Error::Io(e))?; - let state3 = state2 - .receive(message4) - .context("Failed to receive Message4") - .map_err(|e| Error::Io(e))?; - - Ok((swap_id, state3)) + protocol.await.map_err(|_| Error::Timeout { + seconds: max_seconds, + })? } .boxed(), )); @@ -540,6 +559,8 @@ mod protocol { >; } +// TODO: Differentiate between errors that we send back and shit that happens on +// our side (IO, timeout) #[derive(Debug, thiserror::Error)] pub enum Error { #[error("ASB is running in resume-only mode")] @@ -571,7 +592,9 @@ pub enum Error { #[error("Io Error: {0}")] Io(anyhow::Error), #[error("Failed to request wallet snapshot: {0}")] - WalletSnapshotFailed(anyhow::Error) + WalletSnapshotFailed(anyhow::Error), + #[error("Failed to complete execution setup within {seconds}s")] + Timeout { seconds: u64 }, } impl Error { @@ -596,9 +619,8 @@ impl Error { Error::LatestRateFetchFailed(_) | Error::SellQuoteCalculationFailed(_) | Error::WalletSnapshotFailed(_) - | Error::Io(_) => { - SpotPriceError::Other - } + | Error::Timeout { .. } + | Error::Io(_) => SpotPriceError::Other, } } }