mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-08-06 05:24:42 -04:00
Alice spawns swaps outside the event loop
Instead of spawning the swap inside the event loop we send the swap back to the caller to be spawned. This means we no longer need the remote handle that was only used in the tests. This now properly logs the swap results in production. It also gives us more control over Alice's swap in the tests.
This commit is contained in:
parent
904312d1e9
commit
ea05c306e0
8 changed files with 93 additions and 72 deletions
|
@ -27,7 +27,7 @@ use swap::database::Database;
|
|||
use swap::execution_params::{ExecutionParams, GetExecutionParams};
|
||||
use swap::fs::default_config_path;
|
||||
use swap::monero::Amount;
|
||||
use swap::protocol::alice::EventLoop;
|
||||
use swap::protocol::alice::{run, EventLoop};
|
||||
use swap::seed::Seed;
|
||||
use swap::trace::init_tracing;
|
||||
use swap::{bitcoin, execution_params, kraken, monero};
|
||||
|
@ -95,7 +95,7 @@ async fn main() -> Result<()> {
|
|||
|
||||
let kraken_rate_updates = kraken::connect()?;
|
||||
|
||||
let (event_loop, _) = EventLoop::new(
|
||||
let (event_loop, mut swap_receiver) = EventLoop::new(
|
||||
config.network.listen,
|
||||
seed,
|
||||
execution_params,
|
||||
|
@ -107,6 +107,22 @@ async fn main() -> Result<()> {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(swap) = swap_receiver.recv().await {
|
||||
tokio::spawn(async move {
|
||||
let swap_id = swap.swap_id;
|
||||
match run(swap).await {
|
||||
Ok(state) => {
|
||||
tracing::debug!(%swap_id, "Swap finished with state {}", state)
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(%swap_id, "Swap failed with {:#}", e)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
info!("Our peer id is {}", event_loop.peer_id());
|
||||
|
||||
event_loop.run().await;
|
||||
|
|
|
@ -4,20 +4,17 @@ use crate::execution_params::ExecutionParams;
|
|||
use crate::monero::BalanceTooLow;
|
||||
use crate::network::quote::BidQuote;
|
||||
use crate::network::{spot_price, transport, TokioExecutor};
|
||||
use crate::protocol::alice;
|
||||
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof};
|
||||
use crate::protocol::bob::EncryptedSignature;
|
||||
use crate::seed::Seed;
|
||||
use crate::{bitcoin, kraken, monero};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use futures::future::RemoteHandle;
|
||||
use libp2p::core::Multiaddr;
|
||||
use libp2p::futures::FutureExt;
|
||||
use libp2p::{PeerId, Swarm};
|
||||
use rand::rngs::OsRng;
|
||||
use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{debug, error, trace};
|
||||
use uuid::Uuid;
|
||||
|
@ -39,7 +36,7 @@ pub struct EventLoop<RS> {
|
|||
// Only used to produce new handles
|
||||
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
|
||||
|
||||
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
|
||||
swap_sender: mpsc::Sender<Swap>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -62,7 +59,7 @@ where
|
|||
db: Arc<Database>,
|
||||
latest_rate: LR,
|
||||
max_buy: bitcoin::Amount,
|
||||
) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
|
||||
) -> Result<(Self, mpsc::Receiver<Swap>)> {
|
||||
let identity = seed.derive_libp2p_identity();
|
||||
let behaviour = Behaviour::default();
|
||||
let transport = transport::build(&identity)?;
|
||||
|
@ -79,7 +76,7 @@ where
|
|||
|
||||
let recv_encrypted_signature = BroadcastChannels::default();
|
||||
let send_transfer_proof = MpscChannels::default();
|
||||
let swap_handle = MpscChannels::default();
|
||||
let swap_channel = MpscChannels::default();
|
||||
|
||||
let event_loop = EventLoop {
|
||||
swarm,
|
||||
|
@ -92,10 +89,10 @@ where
|
|||
recv_encrypted_signature: recv_encrypted_signature.sender,
|
||||
send_transfer_proof: send_transfer_proof.receiver,
|
||||
send_transfer_proof_sender: send_transfer_proof.sender,
|
||||
swap_handle_sender: swap_handle.sender,
|
||||
swap_sender: swap_channel.sender,
|
||||
max_buy,
|
||||
};
|
||||
Ok((event_loop, swap_handle.receiver))
|
||||
Ok((event_loop, swap_channel.receiver))
|
||||
}
|
||||
|
||||
pub fn new_handle(&self) -> EventLoopHandle {
|
||||
|
@ -231,11 +228,7 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
async fn handle_execution_setup_done(
|
||||
&mut self,
|
||||
bob_peer_id: PeerId,
|
||||
state3: State3,
|
||||
) -> Result<()> {
|
||||
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) {
|
||||
let swap_id = Uuid::new_v4();
|
||||
let handle = self.new_handle();
|
||||
|
||||
|
@ -254,18 +247,9 @@ where
|
|||
swap_id,
|
||||
};
|
||||
|
||||
let (swap, swap_handle) = alice::run(swap).remote_handle();
|
||||
tokio::spawn(swap);
|
||||
|
||||
// For testing purposes the handle is currently sent via a channel so we can
|
||||
// await it. If a remote handle is dropped, the future of the swap is
|
||||
// also stopped. If we error upon sending the handle through the channel
|
||||
// we have to call forget to detach the handle from the swap future.
|
||||
if let Err(SendError(handle)) = self.swap_handle_sender.send(swap_handle).await {
|
||||
handle.forget();
|
||||
if let Err(error) = self.swap_sender.send(swap).await {
|
||||
tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue