feat (Bob): Buffer transfer proof to database when we are running a different swap (#1669)

This commit is contained in:
binarybaron 2024-06-28 21:39:30 +02:00 committed by GitHub
parent 4c9d1e8d8d
commit 23a27680a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 193 additions and 51 deletions

View File

@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
- CLI: Buffer received transfer proofs for later processing if we're currently running a different swap
## [0.13.1] - 2024-06-10
- Add retry logic to monero-wallet-rpc wallet refresh

View File

@ -22,4 +22,10 @@ CREATE TABLE if NOT EXISTS peer_addresses
(
peer_id TEXT NOT NULL,
address TEXT NOT NULL
);
CREATE TABLE if NOT EXISTS buffered_transfer_proofs
(
swap_id TEXT PRIMARY KEY NOT NULL,
proof TEXT NOT NULL
);

View File

@ -195,5 +195,33 @@
}
},
"query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n "
},
"e36c287aa98ae80ad4b6bb6f7e4b59cced041406a9db71da827b09f0d3bacfd6": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Right": 2
}
},
"query": "\n INSERT INTO buffered_transfer_proofs (\n swap_id,\n proof\n ) VALUES (?, ?);\n "
},
"e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae": {
"describe": {
"columns": [
{
"name": "proof",
"ordinal": 0,
"type_info": "Text"
}
],
"nullable": [
false
],
"parameters": {
"Right": 1
}
},
"query": "\n SELECT proof\n FROM buffered_transfer_proofs\n WHERE swap_id = ?\n "
}
}

View File

@ -376,7 +376,7 @@ impl Request {
},
result = async {
let (event_loop, mut event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id)?;
EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?;
let event_loop = tokio::spawn(event_loop.run().in_current_span());
let bid_quote = event_loop_handle.request_quote().await?;
@ -522,7 +522,7 @@ impl Request {
}
let (event_loop, event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id)?;
EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?;
let monero_receive_address = context.db.get_monero_address(swap_id).await?;
let swap = Swap::from_db(
Arc::clone(&context.db),

View File

@ -5,6 +5,7 @@ use crate::network::encrypted_signature;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::bob::NewSwap;
use crate::protocol::bob::State2;
use crate::protocol::Database;
use anyhow::{Context, Result};
use futures::future::{BoxFuture, OptionFuture};
use futures::{FutureExt, StreamExt};
@ -13,6 +14,7 @@ use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
@ -21,6 +23,7 @@ pub struct EventLoop {
swap_id: Uuid,
swarm: libp2p::Swarm<Behaviour>,
alice_peer_id: PeerId,
db: Arc<dyn Database + Send + Sync>,
// these streams represents outgoing requests that we have to make
quote_requests: bmrng::RequestReceiverStream<(), BidQuote>,
@ -51,6 +54,7 @@ impl EventLoop {
swap_id: Uuid,
swarm: Swarm<Behaviour>,
alice_peer_id: PeerId,
db: Arc<dyn Database + Send + Sync>,
) -> Result<(Self, EventLoopHandle)> {
let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(60));
let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(60));
@ -69,6 +73,7 @@ impl EventLoop {
inflight_swap_setup: None,
inflight_encrypted_signature_requests: HashMap::default(),
pending_transfer_proof: OptionFuture::from(None),
db,
};
let handle = EventLoopHandle {
@ -108,38 +113,63 @@ impl EventLoop {
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
let swap_id = msg.swap_id;
if peer != self.alice_peer_id {
tracing::warn!(
%swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}",
peer,
self.alice_peer_id);
continue;
}
if swap_id != self.swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored", swap_id, self.swap_id);
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ());
continue;
}
let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await {
Ok(responder) => responder,
Err(e) => {
tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue;
if swap_id == self.swap_id {
if peer != self.alice_peer_id {
tracing::warn!(
%swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}",
peer,
self.alice_peer_id);
continue;
}
};
self.pending_transfer_proof = OptionFuture::from(Some(async move {
let _ = responder.recv().await;
let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await {
Ok(responder) => responder,
Err(e) => {
tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue;
}
};
channel
}.boxed()));
self.pending_transfer_proof = OptionFuture::from(Some(async move {
let _ = responder.recv().await;
channel
}.boxed()));
}else {
// Check if the transfer proof is sent from the correct peer and if we have a record of the swap
match self.db.get_peer_id(swap_id).await {
// We have a record of the swap
Ok(buffer_swap_alice_peer_id) => {
if buffer_swap_alice_peer_id == self.alice_peer_id {
// Save transfer proof in the database such that we can process it later when we resume the swap
match self.db.insert_buffered_transfer_proof(swap_id, msg.tx_lock_proof).await {
Ok(_) => {
tracing::info!("Received transfer proof for swap {} while running swap {}. Buffering this transfer proof in the database for later retrieval", swap_id, self.swap_id);
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ());
}
Err(e) => {
tracing::error!("Failed to buffer transfer proof for swap {}: {:#}", swap_id, e);
}
};
}else {
tracing::warn!(
%swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}",
self.swap_id,
buffer_swap_alice_peer_id);
}
},
// We do not have a record of the swap or an error occurred while retrieving the peer id of Alice
Err(e) => {
if let Some(sqlx::Error::RowNotFound) = e.downcast_ref::<sqlx::Error>() {
tracing::warn!("Ignoring transfer proof for swap {} while running swap {}. We do not have a record of this swap", swap_id, self.swap_id);
} else {
tracing::error!("Ignoring transfer proof for swap {} while running swap {}. Failed to retrieve the peer id of Alice for the corresponding swap: {:#}", swap_id, self.swap_id, e);
}
}
}
}
}
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => {
if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) {

View File

@ -1,5 +1,5 @@
use crate::database::Swap;
use crate::monero::Address;
use crate::monero::{Address, TransferProof};
use crate::protocol::{Database, State};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
@ -303,6 +303,56 @@ impl Database for SqliteDatabase {
result
}
async fn insert_buffered_transfer_proof(
&self,
swap_id: Uuid,
proof: TransferProof,
) -> Result<()> {
let mut conn = self.pool.acquire().await?;
let swap_id = swap_id.to_string();
let proof = serde_json::to_string(&proof)?;
sqlx::query!(
r#"
INSERT INTO buffered_transfer_proofs (
swap_id,
proof
) VALUES (?, ?);
"#,
swap_id,
proof
)
.execute(&mut conn)
.await?;
Ok(())
}
async fn get_buffered_transfer_proof(&self, swap_id: Uuid) -> Result<Option<TransferProof>> {
let mut conn = self.pool.acquire().await?;
let swap_id = swap_id.to_string();
let row = sqlx::query!(
r#"
SELECT proof
FROM buffered_transfer_proofs
WHERE swap_id = ?
"#,
swap_id
)
.fetch_all(&mut conn)
.await?;
if row.is_empty() {
return Ok(None);
}
let proof_str = &row[0].proof;
let proof = serde_json::from_str(proof_str)?;
Ok(Some(proof))
}
async fn raw_all(&self) -> Result<HashMap<Uuid, Vec<serde_json::Value>>> {
let mut conn = self.pool.acquire().await?;
let rows = sqlx::query!(

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use futures::stream::FusedStream;
use futures::{future, Future, Stream, StreamExt};
use futures::{future, Future, StreamExt};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::upgrade::Version;
use libp2p::core::transport::MemoryTransport;
@ -75,8 +75,8 @@ async fn get_local_tcp_address() -> Multiaddr {
}
pub async fn await_events_or_timeout<A, B, E1, E2>(
swarm_1: &mut (impl Stream<Item = SwarmEvent<A, E1>> + FusedStream + Unpin),
swarm_2: &mut (impl Stream<Item = SwarmEvent<B, E2>> + FusedStream + Unpin),
swarm_1: &mut (impl FusedStream<Item = SwarmEvent<A, E1>> + FusedStream + Unpin),
swarm_2: &mut (impl FusedStream<Item = SwarmEvent<B, E2>> + FusedStream + Unpin),
) -> (SwarmEvent<A, E1>, SwarmEvent<B, E2>)
where
SwarmEvent<A, E1>: Debug,

View File

@ -146,4 +146,13 @@ pub trait Database {
async fn get_states(&self, swap_id: Uuid) -> Result<Vec<State>>;
async fn all(&self) -> Result<Vec<(Uuid, State)>>;
async fn raw_all(&self) -> Result<HashMap<Uuid, Vec<serde_json::Value>>>;
async fn insert_buffered_transfer_proof(
&self,
swap_id: Uuid,
proof: monero::TransferProof,
) -> Result<()>;
async fn get_buffered_transfer_proof(
&self,
swap_id: Uuid,
) -> Result<Option<monero::TransferProof>>;
}

View File

@ -1,10 +1,11 @@
use crate::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund};
use crate::cli::EventLoopHandle;
use crate::network::swap_setup::bob::NewSwap;
use crate::protocol::bob;
use crate::protocol::bob::state::*;
use crate::protocol::{bob, Database};
use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result};
use std::sync::Arc;
use tokio::select;
use uuid::Uuid;
@ -34,6 +35,7 @@ pub async fn run_until(
swap.id,
current_state.clone(),
&mut swap.event_loop_handle,
swap.db.clone(),
swap.bitcoin_wallet.as_ref(),
swap.monero_wallet.as_ref(),
swap.monero_receive_address,
@ -52,6 +54,7 @@ async fn next_state(
swap_id: Uuid,
state: BobState,
event_loop_handle: &mut EventLoopHandle,
db: Arc<dyn Database + Send + Sync>,
bitcoin_wallet: &bitcoin::Wallet,
monero_wallet: &monero::Wallet,
monero_receive_address: monero::Address,
@ -118,12 +121,28 @@ async fn next_state(
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
if let ExpiredTimelocks::None { .. } = state3.expired_timelock(bitcoin_wallet).await? {
tracing::info!("Waiting for Alice to lock Monero");
let buffered_transfer_proof = db
.get_buffered_transfer_proof(swap_id)
.await
.context("Failed to get buffered transfer proof")?;
if let Some(transfer_proof) = buffered_transfer_proof {
tracing::debug!(txid = %transfer_proof.tx_hash(), "Found buffered transfer proof");
tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero");
return Ok(BobState::XmrLockProofReceived {
state: state3,
lock_transfer_proof: transfer_proof,
monero_wallet_restore_blockheight,
});
}
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock);
tracing::info!("Waiting for Alice to lock Monero");
select! {
transfer_proof = transfer_proof_watcher => {
let transfer_proof = transfer_proof?;

View File

@ -32,8 +32,7 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() {
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2, FixedRate::default()));
// The 2nd swap ALWAYS finish successfully in this
// scenario, but will receive an "unwanted" transfer proof that is ignored in
// the event loop.
// scenario, but will receive an "unwanted" transfer proof that is buffered until the 1st swap is resumed
let bob_state_2 = bob_swap_2.await??;
assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. }));
@ -46,15 +45,13 @@ async fn concurrent_bobs_before_xmr_lock_proof_sent() {
.await;
assert!(matches!(bob_state_1, BobState::BtcLocked { .. }));
// The 1st (paused) swap is expected to refund, because the transfer
// proof is delivered to the wrong swap, and we currently don't store it in the
// database for the other swap.
// The 1st (paused) swap is expected to finish successfully because the transfer proof is buffered when it is receives while another swap is running.
let bob_state_1 = bob::run(bob_swap_1).await?;
assert!(matches!(bob_state_1, BobState::BtcRefunded { .. }));
assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. }));
let alice_state_1 = alice_swap_1.await??;
assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. }));
assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. }));
Ok(())
})

View File

@ -427,8 +427,6 @@ impl BobParams {
}
pub async fn new_swap_from_db(&self, swap_id: Uuid) -> Result<(bob::Swap, cli::EventLoop)> {
let (event_loop, handle) = self.new_eventloop(swap_id).await?;
if let Some(parent_dir) = self.db_path.parent() {
ensure_directory_exists(parent_dir)?;
}
@ -437,8 +435,10 @@ impl BobParams {
}
let db = Arc::new(SqliteDatabase::open(&self.db_path).await?);
let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?;
let swap = bob::Swap::from_db(
db,
db.clone(),
swap_id,
self.bitcoin_wallet.clone(),
self.monero_wallet.clone(),
@ -457,8 +457,6 @@ impl BobParams {
) -> Result<(bob::Swap, cli::EventLoop)> {
let swap_id = Uuid::new_v4();
let (event_loop, handle) = self.new_eventloop(swap_id).await?;
if let Some(parent_dir) = self.db_path.parent() {
ensure_directory_exists(parent_dir)?;
}
@ -467,6 +465,8 @@ impl BobParams {
}
let db = Arc::new(SqliteDatabase::open(&self.db_path).await?);
let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?;
db.insert_peer_id(swap_id, self.alice_peer_id).await?;
let swap = bob::Swap::new(
@ -487,6 +487,7 @@ impl BobParams {
pub async fn new_eventloop(
&self,
swap_id: Uuid,
db: Arc<dyn Database + Send + Sync>,
) -> Result<(cli::EventLoop, cli::EventLoopHandle)> {
let tor_socks5_port = get_port()
.expect("We don't care about Tor in the tests so we get a free port to disable it.");
@ -503,7 +504,7 @@ impl BobParams {
.behaviour_mut()
.add_address(self.alice_peer_id, self.alice_address.clone());
cli::EventLoop::new(swap_id, swarm, self.alice_peer_id)
cli::EventLoop::new(swap_id, swarm, self.alice_peer_id, db.clone())
}
}