Move files from protocol to appropriate module

Some network and application specific code does not belong in the protocol module and was moved.
Eventloop, recovery and the outside behaviour were moved to the respective application module because they are application specific.

The `swap_setup` was moved into the network module because upon change both sides will have to be changed and should thus stay close together.
This commit is contained in:
Daniel Karzel 2021-06-25 13:40:33 +10:00
parent 818147a629
commit c0070f8fa7
No known key found for this signature in database
GPG key ID: 30C3FC2E438ADB6E
46 changed files with 161 additions and 145 deletions

102
swap/src/cli/behaviour.rs Normal file
View file

@ -0,0 +1,102 @@
use crate::network::quote::BidQuote;
use crate::network::swap_setup::bob;
use crate::network::{encrypted_signature, quote, redial, transfer_proof};
use crate::protocol::bob::State2;
use crate::{bitcoin, env};
use anyhow::{anyhow, Error, Result};
use libp2p::core::Multiaddr;
use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
pub enum OutEvent {
QuoteReceived {
id: RequestId,
response: BidQuote,
},
SwapSetupCompleted(Box<Result<State2>>),
TransferProofReceived {
msg: Box<transfer_proof::Request>,
channel: ResponseChannel<()>,
peer: PeerId,
},
EncryptedSignatureAcknowledged {
id: RequestId,
},
AllRedialAttemptsExhausted {
peer: PeerId,
},
Failure {
peer: PeerId,
error: Error,
},
/// "Fallback" variant that allows the event mapping code to swallow certain
/// events that we don't want the caller to deal with.
Other,
}
impl OutEvent {
pub fn unexpected_request(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected request received"),
}
}
pub fn unexpected_response(peer: PeerId) -> OutEvent {
OutEvent::Failure {
peer,
error: anyhow!("Unexpected response received"),
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Behaviour {
pub quote: quote::Behaviour,
pub swap_setup: bob::Behaviour,
pub transfer_proof: transfer_proof::Behaviour,
pub encrypted_signature: encrypted_signature::Behaviour,
pub redial: redial::Behaviour,
/// Ping behaviour that ensures that the underlying network connection is
/// still alive. If the ping fails a connection close event will be
/// emitted that is picked up as swarm event.
ping: Ping,
}
impl Behaviour {
pub fn new(
alice: PeerId,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
) -> Self {
Self {
quote: quote::bob(),
swap_setup: bob::Behaviour::new(env_config, bitcoin_wallet),
transfer_proof: transfer_proof::bob(),
encrypted_signature: encrypted_signature::bob(),
redial: redial::Behaviour::new(alice, Duration::from_secs(2)),
ping: Ping::default(),
}
}
/// Add a known address for the given peer
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.quote.add_address(&peer_id, address.clone());
self.transfer_proof.add_address(&peer_id, address.clone());
self.encrypted_signature.add_address(&peer_id, address);
}
}
impl From<PingEvent> for OutEvent {
fn from(_: PingEvent) -> Self {
OutEvent::Other
}
}

65
swap/src/cli/cancel.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::bitcoin::{ExpiredTimelocks, Txid, Wallet};
use crate::database::{Database, Swap};
use crate::protocol::bob::BobState;
use anyhow::{bail, Result};
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, thiserror::Error, Clone, Copy)]
pub enum Error {
#[error("The cancel timelock has not expired yet.")]
CancelTimelockNotExpiredYet,
}
pub async fn cancel(
swap_id: Uuid,
bitcoin_wallet: Arc<Wallet>,
db: Database,
force: bool,
) -> Result<Result<(Txid, BobState), Error>> {
let state = db.get_state(swap_id)?.try_into_bob()?.into();
let state6 = match state {
BobState::BtcLocked(state3) => state3.cancel(),
BobState::XmrLockProofReceived { state, .. } => state.cancel(),
BobState::XmrLocked(state4) => state4.cancel(),
BobState::EncSigSent(state4) => state4.cancel(),
BobState::CancelTimelockExpired(state6) => state6,
BobState::Started { .. }
| BobState::SwapSetupCompleted(_)
| BobState::BtcRedeemed(_)
| BobState::BtcCancelled(_)
| BobState::BtcRefunded(_)
| BobState::XmrRedeemed { .. }
| BobState::BtcPunished { .. }
| BobState::SafelyAborted => bail!(
"Cannot cancel swap {} because it is in state {} which is not refundable.",
swap_id,
state
),
};
tracing::info!(%swap_id, "Manually cancelling swap");
if !force {
tracing::debug!(%swap_id, "Checking if cancel timelock is expired");
if let ExpiredTimelocks::None = state6.expired_timelock(bitcoin_wallet.as_ref()).await? {
return Ok(Err(Error::CancelTimelockNotExpiredYet));
}
}
let txid = if let Ok(tx) = state6.check_for_tx_cancel(bitcoin_wallet.as_ref()).await {
tracing::debug!(%swap_id, "Cancel transaction has already been published");
tx.txid()
} else {
state6.submit_tx_cancel(bitcoin_wallet.as_ref()).await?
};
let state = BobState::BtcCancelled(state6);
let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?;
Ok(Ok((txid, state)))
}

257
swap/src/cli/event_loop.rs Normal file
View file

@ -0,0 +1,257 @@
use crate::bitcoin::EncryptedSignature;
use crate::cli::behaviour::{Behaviour, OutEvent};
use crate::network::encrypted_signature;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::bob::NewSwap;
use crate::protocol::bob::State2;
use crate::{env, monero};
use anyhow::{Context, Result};
use futures::future::{BoxFuture, OptionFuture};
use futures::{FutureExt, StreamExt};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;
#[allow(missing_debug_implementations)]
pub struct EventLoop {
swap_id: Uuid,
swarm: libp2p::Swarm<Behaviour>,
alice_peer_id: PeerId,
// these streams represents outgoing requests that we have to make
quote_requests: bmrng::RequestReceiverStream<(), BidQuote>,
encrypted_signatures: bmrng::RequestReceiverStream<EncryptedSignature, ()>,
swap_setup_requests: bmrng::RequestReceiverStream<NewSwap, Result<State2>>,
// these represents requests that are currently in-flight.
// once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response.
inflight_quote_requests: HashMap<RequestId, bmrng::Responder<BidQuote>>,
inflight_encrypted_signature_requests: HashMap<RequestId, bmrng::Responder<()>>,
inflight_swap_setup: Option<bmrng::Responder<Result<State2>>>,
/// The sender we will use to relay incoming transfer proofs.
transfer_proof: bmrng::RequestSender<monero::TransferProof, ()>,
/// The future representing the successful handling of an incoming transfer
/// proof.
///
/// Once we've sent a transfer proof to the ongoing swap, this future waits
/// until the swap took it "out" of the `EventLoopHandle`. As this future
/// resolves, we use the `ResponseChannel` returned from it to send an ACK
/// to Alice that we have successfully processed the transfer proof.
pending_transfer_proof: OptionFuture<BoxFuture<'static, ResponseChannel<()>>>,
}
impl EventLoop {
pub fn new(
swap_id: Uuid,
swarm: Swarm<Behaviour>,
alice_peer_id: PeerId,
env_config: env::Config,
) -> Result<(Self, EventLoopHandle)> {
let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(60));
let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(60));
let encrypted_signature = bmrng::channel_with_timeout(1, Duration::from_secs(60));
let quote = bmrng::channel_with_timeout(1, Duration::from_secs(60));
let event_loop = EventLoop {
swap_id,
swarm,
alice_peer_id,
swap_setup_requests: execution_setup.1.into(),
transfer_proof: transfer_proof.0,
encrypted_signatures: encrypted_signature.1.into(),
quote_requests: quote.1.into(),
inflight_quote_requests: HashMap::default(),
inflight_swap_setup: None,
inflight_encrypted_signature_requests: HashMap::default(),
pending_transfer_proof: OptionFuture::from(None),
};
let handle = EventLoopHandle {
swap_setup: execution_setup.0,
transfer_proof: transfer_proof.1,
encrypted_signature: encrypted_signature.0,
quote: quote.0,
env_config,
};
Ok((event_loop, handle))
}
pub async fn run(mut self) {
match self.swarm.dial(&self.alice_peer_id) {
Ok(()) => {}
Err(e) => {
tracing::error!("Failed to initiate dial to Alice: {}", e);
return;
}
}
loop {
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
tokio::select! {
swarm_event = self.swarm.next_event().fuse() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
if let Some(responder) = self.inflight_quote_requests.remove(&id) {
let _ = responder.respond(response);
}
}
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => {
if let Some(responder) = self.inflight_swap_setup.take() {
let _ = responder.respond(*response);
}
}
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
let swap_id = msg.swap_id;
if peer != self.alice_peer_id {
tracing::warn!(
%swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}",
peer,
self.alice_peer_id);
continue;
}
if swap_id != self.swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored", swap_id, self.swap_id);
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ());
continue;
}
let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await {
Ok(responder) => responder,
Err(e) => {
tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue;
}
};
self.pending_transfer_proof = OptionFuture::from(Some(async move {
let _ = responder.recv().await;
channel
}.boxed()));
}
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => {
if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) {
let _ = responder.respond(());
}
}
SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer }) if peer == self.alice_peer_id => {
tracing::error!("Exhausted all re-dial attempts to Alice");
return;
}
SwarmEvent::Behaviour(OutEvent::Failure { peer, error }) => {
tracing::warn!(%peer, "Communication error: {:#}", error);
return;
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => {
tracing::info!("Connected to Alice at {}", endpoint.get_remote_address());
}
SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => {
tracing::debug!("Dialling Alice at {}", peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) } if peer_id == self.alice_peer_id && num_established == 0 => {
tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error);
}
SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => {
// no error means the disconnection was requested
tracing::info!("Successfully closed connection to Alice");
return;
}
SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => {
tracing::warn!(%address, "Failed to dial Alice: {}", error);
if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() {
tracing::info!("Next redial attempt in {}s", duration.as_secs());
}
}
_ => {}
}
},
// Handle to-be-sent requests for all our network protocols.
// Use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected.
Some(((), responder)) = self.quote_requests.next().fuse(), if self.is_connected_to_alice() => {
let id = self.swarm.behaviour_mut().quote.send_request(&self.alice_peer_id, ());
self.inflight_quote_requests.insert(id, responder);
},
Some((swap, responder)) = self.swap_setup_requests.next().fuse(), if self.is_connected_to_alice() => {
self.swarm.behaviour_mut().swap_setup.start(self.alice_peer_id, swap).await;
self.inflight_swap_setup = Some(responder);
},
Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => {
let request = encrypted_signature::Request {
swap_id: self.swap_id,
tx_redeem_encsig
};
let id = self.swarm.behaviour_mut().encrypted_signature.send_request(&self.alice_peer_id, request);
self.inflight_encrypted_signature_requests.insert(id, responder);
},
Some(response_channel) = &mut self.pending_transfer_proof => {
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(response_channel, ());
self.pending_transfer_proof = OptionFuture::from(None);
}
}
}
}
fn is_connected_to_alice(&self) -> bool {
self.swarm.is_connected(&self.alice_peer_id)
}
}
#[derive(Debug)]
pub struct EventLoopHandle {
swap_setup: bmrng::RequestSender<NewSwap, Result<State2>>,
transfer_proof: bmrng::RequestReceiver<monero::TransferProof, ()>,
encrypted_signature: bmrng::RequestSender<EncryptedSignature, ()>,
quote: bmrng::RequestSender<(), BidQuote>,
env_config: env::Config,
}
impl EventLoopHandle {
pub async fn setup_swap(&mut self, swap: NewSwap) -> Result<State2> {
self.swap_setup.send_receive(swap).await?
}
pub async fn recv_transfer_proof(&mut self) -> Result<monero::TransferProof> {
let (transfer_proof, responder) = self
.transfer_proof
.recv()
.await
.context("Failed to receive transfer proof")?;
responder
.respond(())
.context("Failed to acknowledge receipt of transfer proof")?;
Ok(transfer_proof)
}
pub async fn request_quote(&mut self) -> Result<BidQuote> {
Ok(self.quote.send_receive(()).await?)
}
pub async fn send_encrypted_signature(
&mut self,
tx_redeem_encsig: EncryptedSignature,
) -> Result<()> {
Ok(self
.encrypted_signature
.send_receive(tx_redeem_encsig)
.await?)
}
}

57
swap/src/cli/refund.rs Normal file
View file

@ -0,0 +1,57 @@
use crate::bitcoin::Wallet;
use crate::database::{Database, Swap};
use crate::protocol::bob::BobState;
use anyhow::{bail, Result};
use std::sync::Arc;
use uuid::Uuid;
#[derive(thiserror::Error, Debug, Clone, Copy)]
#[error("Cannot refund because swap {0} was not cancelled yet. Make sure to cancel the swap before trying to refund.")]
pub struct SwapNotCancelledYet(pub Uuid);
pub async fn refund(
swap_id: Uuid,
bitcoin_wallet: Arc<Wallet>,
db: Database,
force: bool,
) -> Result<Result<BobState, SwapNotCancelledYet>> {
let state = db.get_state(swap_id)?.try_into_bob()?.into();
let state6 = if force {
match state {
BobState::BtcLocked(state3) => state3.cancel(),
BobState::XmrLockProofReceived { state, .. } => state.cancel(),
BobState::XmrLocked(state4) => state4.cancel(),
BobState::EncSigSent(state4) => state4.cancel(),
BobState::CancelTimelockExpired(state6) => state6,
BobState::BtcCancelled(state6) => state6,
BobState::Started { .. }
| BobState::SwapSetupCompleted(_)
| BobState::BtcRedeemed(_)
| BobState::BtcRefunded(_)
| BobState::XmrRedeemed { .. }
| BobState::BtcPunished { .. }
| BobState::SafelyAborted => bail!(
"Cannot refund swap {} because it is in state {} which is not refundable.",
swap_id,
state
),
}
} else {
match state {
BobState::BtcCancelled(state6) => state6,
_ => {
return Ok(Err(SwapNotCancelledYet(swap_id)));
}
}
};
state6.publish_refund_btc(bitcoin_wallet.as_ref()).await?;
let state = BobState::BtcRefunded(state6);
let db_state = state.clone().into();
db.insert_latest_state(swap_id, Swap::Bob(db_state)).await?;
Ok(Ok(state))
}