From 2258e26451ea0827ad45b2998acfe6dedd9b5541 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Tue, 4 Jun 2024 10:37:23 +0200 Subject: [PATCH] feat (Bob): Buffer transfer proof to database when we are running a different swap --- .../20210903050345_create_swaps_table.sql | 6 ++ swap/sqlx-data.json | 28 ++++++++++ swap/src/api/request.rs | 4 +- swap/src/cli/event_loop.rs | 20 +++++-- swap/src/database/sqlite.rs | 52 +++++++++++++++++- swap/src/protocol.rs | 9 +++ swap/src/protocol/bob/swap.rs | 25 ++++++++- swap/tests/harness/mod.rs | 11 ++-- tempdb | Bin 0 -> 4096 bytes 9 files changed, 140 insertions(+), 15 deletions(-) create mode 100644 tempdb diff --git a/swap/migrations/20210903050345_create_swaps_table.sql b/swap/migrations/20210903050345_create_swaps_table.sql index 741a45e6..84a97d0f 100644 --- a/swap/migrations/20210903050345_create_swaps_table.sql +++ b/swap/migrations/20210903050345_create_swaps_table.sql @@ -22,4 +22,10 @@ CREATE TABLE if NOT EXISTS peer_addresses ( peer_id TEXT NOT NULL, address TEXT NOT NULL +); + +CREATE TABLE if NOT EXISTS buffered_transfer_proofs +( + swap_id TEXT PRIMARY KEY NOT NULL, + proof TEXT NOT NULL ); \ No newline at end of file diff --git a/swap/sqlx-data.json b/swap/sqlx-data.json index f24a50e6..e94a6791 100644 --- a/swap/sqlx-data.json +++ b/swap/sqlx-data.json @@ -104,6 +104,16 @@ }, "query": "\n SELECT swap_id, state\n FROM swap_states\n " }, + "480ae07600eebde51d8522c37162f28f88deaf4d0ed85c0819d8ed991025b258": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Right": 2 + } + }, + "query": "\n insert into buffered_transfer_proofs (\n swap_id,\n proof\n ) values (?, ?);\n " + }, "50a5764546f69c118fa0b64120da50f51073d36257d49768de99ff863e3511e0": { "describe": { "columns": [], @@ -195,5 +205,23 @@ } }, "query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n " + }, + "e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae": { + "describe": { + "columns": [ + { + "name": "proof", + "ordinal": 0, + "type_info": "Text" + } + ], + "nullable": [ + false + ], + "parameters": { + "Right": 1 + } + }, + "query": "\n SELECT proof\n FROM buffered_transfer_proofs\n WHERE swap_id = ?\n " } } \ No newline at end of file diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index 02bf27e1..f8ca5023 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -376,7 +376,7 @@ impl Request { }, result = async { let (event_loop, mut event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; + EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?; let event_loop = tokio::spawn(event_loop.run().in_current_span()); let bid_quote = event_loop_handle.request_quote().await?; @@ -522,7 +522,7 @@ impl Request { } let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id)?; + EventLoop::new(swap_id, swarm, seller_peer_id, context.db.clone())?; let monero_receive_address = context.db.get_monero_address(swap_id).await?; let swap = Swap::from_db( Arc::clone(&context.db), diff --git a/swap/src/cli/event_loop.rs b/swap/src/cli/event_loop.rs index befe1dc5..a8b3abcf 100644 --- a/swap/src/cli/event_loop.rs +++ b/swap/src/cli/event_loop.rs @@ -5,6 +5,7 @@ use crate::network::encrypted_signature; use crate::network::quote::BidQuote; use crate::network::swap_setup::bob::NewSwap; use crate::protocol::bob::State2; +use crate::protocol::Database; use anyhow::{Context, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::{FutureExt, StreamExt}; @@ -13,6 +14,7 @@ use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -21,6 +23,7 @@ pub struct EventLoop { swap_id: Uuid, swarm: libp2p::Swarm, alice_peer_id: PeerId, + db: Arc, // these streams represents outgoing requests that we have to make quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, @@ -51,6 +54,7 @@ impl EventLoop { swap_id: Uuid, swarm: Swarm, alice_peer_id: PeerId, + db: Arc, ) -> Result<(Self, EventLoopHandle)> { let execution_setup = bmrng::channel_with_timeout(1, Duration::from_secs(60)); let transfer_proof = bmrng::channel_with_timeout(1, Duration::from_secs(60)); @@ -69,6 +73,7 @@ impl EventLoop { inflight_swap_setup: None, inflight_encrypted_signature_requests: HashMap::default(), pending_transfer_proof: OptionFuture::from(None), + db, }; let handle = EventLoopHandle { @@ -118,12 +123,19 @@ impl EventLoop { } if swap_id != self.swap_id { + tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}", swap_id, self.swap_id); - // TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps - tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored", swap_id, self.swap_id); + // Save transfer proof in the database such that we can process it later when we resume the swap + match self.db.insert_buffered_transfer_proof(swap_id, msg.tx_lock_proof).await { + Ok(_) => { + tracing::info!("Saved unexpected transfer proof for swap {}", swap_id); + let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ()); + } + Err(e) => { + tracing::error!("Failed to save unexpected transfer proof for swap {}: {:#}", swap_id, e); + } + }; - // When receiving a transfer proof that is unexpected we still have to acknowledge that it was received - let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ()); continue; } diff --git a/swap/src/database/sqlite.rs b/swap/src/database/sqlite.rs index 751f76aa..6cddd50e 100644 --- a/swap/src/database/sqlite.rs +++ b/swap/src/database/sqlite.rs @@ -1,5 +1,5 @@ use crate::database::Swap; -use crate::monero::Address; +use crate::monero::{Address, TransferProof}; use crate::protocol::{Database, State}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -303,6 +303,56 @@ impl Database for SqliteDatabase { result } + async fn insert_buffered_transfer_proof( + &self, + swap_id: Uuid, + proof: TransferProof, + ) -> Result<()> { + let mut conn = self.pool.acquire().await?; + let swap_id = swap_id.to_string(); + let proof = serde_json::to_string(&proof)?; + + sqlx::query!( + r#" + INSERT INTO buffered_transfer_proofs ( + swap_id, + proof + ) VALUES (?, ?); + "#, + swap_id, + proof + ) + .execute(&mut conn) + .await?; + + Ok(()) + } + + async fn get_buffered_transfer_proof(&self, swap_id: Uuid) -> Result> { + let mut conn = self.pool.acquire().await?; + let swap_id = swap_id.to_string(); + + let row = sqlx::query!( + r#" + SELECT proof + FROM buffered_transfer_proofs + WHERE swap_id = ? + "#, + swap_id + ) + .fetch_all(&mut conn) + .await?; + + if row.is_empty() { + return Ok(None); + } + + let proof_str = &row[0].proof; + let proof = serde_json::from_str(proof_str)?; + + Ok(Some(proof)) + } + async fn raw_all(&self) -> Result>> { let mut conn = self.pool.acquire().await?; let rows = sqlx::query!( diff --git a/swap/src/protocol.rs b/swap/src/protocol.rs index 0e15f89a..676a03f4 100644 --- a/swap/src/protocol.rs +++ b/swap/src/protocol.rs @@ -146,4 +146,13 @@ pub trait Database { async fn get_states(&self, swap_id: Uuid) -> Result>; async fn all(&self) -> Result>; async fn raw_all(&self) -> Result>>; + async fn insert_buffered_transfer_proof( + &self, + swap_id: Uuid, + proof: monero::TransferProof, + ) -> Result<()>; + async fn get_buffered_transfer_proof( + &self, + swap_id: Uuid, + ) -> Result>; } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 37a6b65a..14f47ddc 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -1,10 +1,11 @@ use crate::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund}; use crate::cli::EventLoopHandle; use crate::network::swap_setup::bob::NewSwap; -use crate::protocol::bob; use crate::protocol::bob::state::*; +use crate::protocol::{bob, Database}; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; +use std::sync::Arc; use tokio::select; use uuid::Uuid; @@ -34,6 +35,7 @@ pub async fn run_until( swap.id, current_state.clone(), &mut swap.event_loop_handle, + swap.db.clone(), swap.bitcoin_wallet.as_ref(), swap.monero_wallet.as_ref(), swap.monero_receive_address, @@ -52,6 +54,7 @@ async fn next_state( swap_id: Uuid, state: BobState, event_loop_handle: &mut EventLoopHandle, + db: Arc, bitcoin_wallet: &bitcoin::Wallet, monero_wallet: &monero::Wallet, monero_receive_address: monero::Address, @@ -118,12 +121,28 @@ async fn next_state( let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await; if let ExpiredTimelocks::None { .. } = state3.expired_timelock(bitcoin_wallet).await? { + tracing::info!("Waiting for Alice to lock Monero"); + + let buffered_transfer_proof = db + .get_buffered_transfer_proof(swap_id) + .await + .context("Failed to get buffered transfer proof")?; + + if let Some(transfer_proof) = buffered_transfer_proof { + tracing::debug!(txid = %transfer_proof.tx_hash(), "Found buffered transfer proof"); + tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); + + return Ok(BobState::XmrLockProofReceived { + state: state3, + lock_transfer_proof: transfer_proof, + monero_wallet_restore_blockheight, + }); + } + let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock); - tracing::info!("Waiting for Alice to lock Monero"); - select! { transfer_proof = transfer_proof_watcher => { let transfer_proof = transfer_proof?; diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index c099376d..119dd096 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -427,8 +427,6 @@ impl BobParams { } pub async fn new_swap_from_db(&self, swap_id: Uuid) -> Result<(bob::Swap, cli::EventLoop)> { - let (event_loop, handle) = self.new_eventloop(swap_id).await?; - if let Some(parent_dir) = self.db_path.parent() { ensure_directory_exists(parent_dir)?; } @@ -437,6 +435,8 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path).await?); + let (event_loop, handle) = self.new_eventloop(swap_id, db).await?; + let swap = bob::Swap::from_db( db, swap_id, @@ -457,8 +457,6 @@ impl BobParams { ) -> Result<(bob::Swap, cli::EventLoop)> { let swap_id = Uuid::new_v4(); - let (event_loop, handle) = self.new_eventloop(swap_id).await?; - if let Some(parent_dir) = self.db_path.parent() { ensure_directory_exists(parent_dir)?; } @@ -467,6 +465,8 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path).await?); + let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; + db.insert_peer_id(swap_id, self.alice_peer_id).await?; let swap = bob::Swap::new( @@ -487,6 +487,7 @@ impl BobParams { pub async fn new_eventloop( &self, swap_id: Uuid, + db: Arc, ) -> Result<(cli::EventLoop, cli::EventLoopHandle)> { let tor_socks5_port = get_port() .expect("We don't care about Tor in the tests so we get a free port to disable it."); @@ -503,7 +504,7 @@ impl BobParams { .behaviour_mut() .add_address(self.alice_peer_id, self.alice_address.clone()); - cli::EventLoop::new(swap_id, swarm, self.alice_peer_id) + cli::EventLoop::new(swap_id, swarm, self.alice_peer_id, db.clone()); } } diff --git a/tempdb b/tempdb new file mode 100644 index 0000000000000000000000000000000000000000..3d1e885c117696bd5e798b7c2570aa90f2e7f3e6 GIT binary patch literal 4096 zcmWFz^vNtqRY=P(%1ta$FlG>7U}9o$P*7lCU|@t|AVoG{WYCK?;st3JAlr;ljiVtj n8UmvsFd71*Aut*OqaiRF0;3@?8UmvsFd71*Aut*O6ovo*V(tc> literal 0 HcmV?d00001