refactor(bob): EventLoop concurrency (#642)

* progress

* fix race condition where EventLoopHandle::swap_handle would take forever if event loop was not running

* add trace statements in bob event loop, inform swarm about redials

* progress

* revamp of protocols/swap_setup, reliable error propagation, slowly getting this to work...

* some formatting

* fix react shengigans

* If a connection handler died which had an assigned swap setup request, notify the swarm that the request failed

* key inflight swap setup request by (PeerId, SwapID) instead of just peer id

* add min height to swap state page

* extract should_acknowledge_transfer_proof out of event loop, propagate swap setup errors to event loop with context

* add --trace to justfile

* remove docker_test_all.sh

* add back the correct connection_keep_alive swap_setup/alice.rs

* fix some memory leaks

* let swarm_setup behaviour instruct swarm to dial peer

* fmt

* reduce diff

* remove redial::Redialing

* add trace statements to swap_setup/bob.rs

* extract swap setup protocol itself into run_swap_setup

* make queues unbounded, small nitpicks

* do not buffer transfer proof acknowledgements

* prevent swap_setup/bob.rs from keeping all connections alive

* buffer transfer proofs

* do not redial ALL peers

* keep all connections alive with swap_setup/bob.rs

* add comment
This commit is contained in:
Mohan 2025-11-08 17:48:15 +01:00 committed by GitHub
parent c0235827f0
commit 1f2a0605bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 1158 additions and 507 deletions

View file

@ -80,7 +80,7 @@ swap:
# Run the asb on testnet # Run the asb on testnet
asb-testnet: asb-testnet:
ASB_DEV_ADDR_OUTPUT_PATH="$(pwd)/src-gui/.env.development" cargo run -p swap-asb --bin asb -- --testnet start --rpc-bind-port 9944 --rpc-bind-host 0.0.0.0 ASB_DEV_ADDR_OUTPUT_PATH="$(pwd)/src-gui/.env.development" cargo run -p swap-asb --bin asb -- --testnet --trace start --rpc-bind-port 9944 --rpc-bind-host 0.0.0.0
# Launch the ASB controller REPL against a local testnet ASB instance # Launch the ASB controller REPL against a local testnet ASB instance
asb-testnet-controller: asb-testnet-controller:
@ -140,3 +140,4 @@ code2prompt_single_crate crate:
prepare-windows-build: prepare-windows-build:
cd dev-scripts && ./ubuntu_build_x86_86-w64-mingw32-gcc.sh cd dev-scripts && ./ubuntu_build_x86_86-w64-mingw32-gcc.sh

View file

@ -51,8 +51,17 @@ export default function SwapWidget() {
justifyContent: "space-between", justifyContent: "space-between",
flex: 1, flex: 1,
}} }}
>
<Box
sx={{
display: "flex",
minHeight: "30vh",
flexDirection: "column",
justifyContent: "center",
}}
> >
<SwapStatePage state={swap.state} /> <SwapStatePage state={swap.state} />
</Box>
{swap.state !== null && ( {swap.state !== null && (
<> <>
<SwapStateStepper state={swap.state} /> <SwapStateStepper state={swap.state} />

View file

@ -13,7 +13,7 @@ swap-machine = { path = "../swap-machine" }
swap-serde = { path = "../swap-serde" } swap-serde = { path = "../swap-serde" }
# Networking # Networking
libp2p = { workspace = true, features = ["serde", "request-response", "rendezvous", "cbor", "json", "ping", "identify"] } libp2p = { workspace = true, features = ["serde", "request-response", "rendezvous", "cbor", "json", "identify", "ping"] }
# Serialization # Serialization
asynchronous-codec = "0.7.0" asynchronous-codec = "0.7.0"

View file

@ -0,0 +1,71 @@
use libp2p::futures::future::BoxFuture;
use libp2p::futures::stream::{FuturesUnordered, StreamExt};
use std::collections::HashSet;
use std::hash::Hash;
use std::task::{Context, Poll};
/// A collection of futures with associated keys that can be checked for presence
/// before completion.
///
/// This combines a HashSet for key tracking with FuturesUnordered for efficient polling.
/// The key is provided during insertion; the future only needs to yield the value.
pub struct FuturesHashSet<K, V> {
keys: HashSet<K>,
futures: FuturesUnordered<BoxFuture<'static, (K, V)>>,
}
impl<K: Hash + Eq + Clone + Send + 'static, V: 'static> FuturesHashSet<K, V> {
pub fn new() -> Self {
Self {
keys: HashSet::new(),
futures: FuturesUnordered::new(),
}
}
/// Check if a future with the given key is already pending
pub fn contains_key(&self, key: &K) -> bool {
self.keys.contains(key)
}
/// Insert a new future with the given key.
/// The future should yield V; the key will be paired with it when it completes.
/// Returns true if the key was newly inserted, false if it was already present.
/// If false is returned, the future is not added.
pub fn insert(&mut self, key: K, future: BoxFuture<'static, V>) -> bool {
if self.keys.insert(key.clone()) {
let key_clone = key;
let wrapped = async move {
let value = future.await;
(key_clone, value)
};
self.futures.push(Box::pin(wrapped));
true
} else {
false
}
}
/// Poll for the next completed future.
/// When a future completes, its key is automatically removed from the tracking set.
pub fn poll_next_unpin(&mut self, cx: &mut Context) -> Poll<Option<(K, V)>> {
match self.futures.poll_next_unpin(cx) {
Poll::Ready(Some((k, v))) => {
self.keys.remove(&k);
Poll::Ready(Some((k, v)))
}
other => other,
}
}
pub fn len(&self) -> usize {
assert_eq!(self.keys.len(), self.futures.len());
self.keys.len()
}
}
impl<K: Hash + Eq + Clone + Send + 'static, V: 'static> Default for FuturesHashSet<K, V> {
fn default() -> Self {
Self::new()
}
}

View file

@ -1,3 +1,4 @@
pub mod futures_util;
pub mod impl_from_rr_event; pub mod impl_from_rr_event;
pub mod out_event; pub mod out_event;
pub mod protocols; pub mod protocols;

View file

@ -6,6 +6,7 @@ use libp2p::{
PeerId, PeerId,
}; };
use crate::protocols::redial;
use crate::protocols::{ use crate::protocols::{
cooperative_xmr_redeem_after_punish::CooperativeXmrRedeemRejectReason, quote::BidQuote, cooperative_xmr_redeem_after_punish::CooperativeXmrRedeemRejectReason, quote::BidQuote,
transfer_proof, transfer_proof,
@ -17,7 +18,11 @@ pub enum OutEvent {
id: OutboundRequestId, id: OutboundRequestId,
response: BidQuote, response: BidQuote,
}, },
SwapSetupCompleted(Box<anyhow::Result<swap_machine::bob::State2>>), SwapSetupCompleted {
peer: PeerId,
swap_id: uuid::Uuid,
result: Box<anyhow::Result<swap_machine::bob::State2>>,
},
TransferProofReceived { TransferProofReceived {
msg: Box<transfer_proof::Request>, msg: Box<transfer_proof::Request>,
channel: ResponseChannel<()>, channel: ResponseChannel<()>,
@ -53,6 +58,7 @@ pub enum OutEvent {
request_id: InboundRequestId, request_id: InboundRequestId,
protocol: String, protocol: String,
}, },
Redial(redial::Event),
/// "Fallback" variant that allows the event mapping code to swallow certain /// "Fallback" variant that allows the event mapping code to swallow certain
/// events that we don't want the caller to deal with. /// events that we don't want the caller to deal with.
Other, Other,

View file

@ -1,6 +1,7 @@
pub mod cooperative_xmr_redeem_after_punish; pub mod cooperative_xmr_redeem_after_punish;
pub mod encrypted_signature; pub mod encrypted_signature;
pub mod quote; pub mod quote;
pub mod redial;
pub mod rendezvous; pub mod rendezvous;
pub mod swap_setup; pub mod swap_setup;
pub mod transfer_proof; pub mod transfer_proof;

View file

@ -0,0 +1,205 @@
use crate::futures_util::FuturesHashSet;
use crate::out_event;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use libp2p::core::Multiaddr;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{NetworkBehaviour, ToSwarm};
use libp2p::PeerId;
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;
/// A [`NetworkBehaviour`] that tracks whether we are connected to the given
/// peers and attempts to re-establish a connection with an exponential backoff
/// if we lose the connection.
pub struct Behaviour {
/// The peers we are interested in.
peers: HashSet<PeerId>,
/// Tracks sleep timers for each peer waiting to redial.
/// Futures in here yield the PeerId and when a Future completes we dial that peer
sleep: FuturesHashSet<PeerId, ()>,
/// Tracks the current backoff state for each peer.
backoff: HashMap<PeerId, ExponentialBackoff>,
/// Initial interval for backoff.
initial_interval: Duration,
/// Maximum interval for backoff.
max_interval: Duration,
/// A queue of events to be sent to the swarm.
to_swarm: VecDeque<Event>,
}
impl Behaviour {
pub fn new(interval: Duration, max_interval: Duration) -> Self {
Self {
peers: HashSet::default(),
sleep: FuturesHashSet::new(),
backoff: HashMap::new(),
initial_interval: interval,
max_interval,
to_swarm: VecDeque::new(),
}
}
/// Adds a peer to the set of peers to track. Returns true if the peer was newly added.
pub fn add_peer(&mut self, peer: PeerId) -> bool {
let newly_added = self.peers.insert(peer);
// If the peer is newly added, schedule a dial immediately
if newly_added {
self.sleep.insert(peer, Box::pin(std::future::ready(())));
}
newly_added
}
fn get_backoff(&mut self, peer: &PeerId) -> &mut ExponentialBackoff {
self.backoff.entry(*peer).or_insert_with(|| {
ExponentialBackoff {
initial_interval: self.initial_interval,
current_interval: self.initial_interval,
max_interval: self.max_interval,
// We never give up on re-dialling
max_elapsed_time: None,
..ExponentialBackoff::default()
}
})
}
pub fn has_pending_redial(&self, peer: &PeerId) -> bool {
self.sleep.contains_key(peer)
}
}
#[derive(Debug)]
pub enum Event {
ScheduledRedial {
peer: PeerId,
next_dial_in: Duration,
},
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// TOOD: Uncomment this if we want to redial ALL peers we ever connected to
// Add the peer if it's not already tracked.
// self.add_peer(peer);
// Reset the backoff state to start with the initial interval again once we disconnect again
if let Some(backoff) = self.backoff.get_mut(&peer) {
backoff.reset();
}
Ok(Self::ConnectionHandler {})
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// TOOD: Uncomment this if we want to redial ALL peers we ever connected to
// Add the peer if it's not already tracked.
// self.add_peer(peer);
// Reset the backoff state to start with the initial interval again once we disconnect again
if let Some(backoff) = self.backoff.get_mut(&peer) {
backoff.reset();
}
Ok(Self::ConnectionHandler {})
}
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<'_>) {
let peer_to_redial = match event {
libp2p::swarm::FromSwarm::ConnectionClosed(e) if self.peers.contains(&e.peer_id) => {
Some(e.peer_id)
}
libp2p::swarm::FromSwarm::DialFailure(e) => match e.peer_id {
Some(peer_id) if self.peers.contains(&peer_id) => Some(peer_id),
_ => None,
},
_ => None,
};
if let Some(peer) = peer_to_redial {
let backoff = self.get_backoff(&peer);
let next_dial_in = backoff
.next_backoff()
.expect("redial backoff should never run out of attempts");
if self.sleep.insert(
peer,
Box::pin(async move {
tokio::time::sleep(next_dial_in).await;
}),
) {
self.to_swarm
.push_back(Event::ScheduledRedial { peer, next_dial_in });
tracing::info!(
peer_id = %peer,
seconds_until_next_redial = %next_dial_in.as_secs(),
"Waiting for next redial attempt"
);
}
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> std::task::Poll<ToSwarm<Self::ToSwarm, Void>> {
// Check if we have any event to send to the swarm
if let Some(event) = self.to_swarm.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
// Check if any peer's sleep timer has completed
// If it has, dial that peer
match self.sleep.poll_next_unpin(cx) {
Poll::Ready(Some((peer, _))) => {
// Actually dial the peer
Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(peer)
// TODO: Maybe use DisconnectedAndNotDialing here?
.condition(PeerCondition::Disconnected)
.build(),
})
}
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}
fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: libp2p::swarm::ConnectionId,
_event: libp2p::swarm::THandlerOutEvent<Self>,
) {
unreachable!("The re-dial dummy connection handler does not produce any events");
}
}
impl From<Event> for out_event::bob::OutEvent {
fn from(event: Event) -> Self {
out_event::bob::OutEvent::Redial(event)
}
}
impl From<Event> for out_event::alice::OutEvent {
fn from(_event: Event) -> Self {
// TODO: Once this is used by Alice, convert this to a proper event
out_event::alice::OutEvent::Other
}
}

View file

@ -8,12 +8,15 @@ use futures::future::{BoxFuture, OptionFuture};
use futures::AsyncWriteExt; use futures::AsyncWriteExt;
use futures::FutureExt; use futures::FutureExt;
use libp2p::core::upgrade; use libp2p::core::upgrade;
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{ use libp2p::swarm::{
ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId,
NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, FromSwarm, NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
}; };
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
use std::collections::VecDeque; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use std::task::Poll; use std::task::Poll;
use std::time::Duration; use std::time::Duration;
@ -29,8 +32,29 @@ use super::{read_cbor_message, write_cbor_message, SpotPriceRequest};
pub struct Behaviour { pub struct Behaviour {
env_config: env::Config, env_config: env::Config,
bitcoin_wallet: Arc<dyn BitcoinWallet>, bitcoin_wallet: Arc<dyn BitcoinWallet>,
new_swaps: VecDeque<(PeerId, NewSwap)>,
completed_swaps: VecDeque<(PeerId, Completed)>, // Queue of swap setup request that haven't been assigned to a connection handler yet
// (peer_id, swap_id, new_swap)
new_swaps: VecDeque<(PeerId, Uuid, NewSwap)>,
// Maintains the list of connections handlers for a specific peer
//
// 0. List of connection handlers that are still active but haven't been assigned a swap setup request yet
// 1. List of connection handlers that have died. Once their death is acknowledged / processed, they are removed from the list
connection_handlers: HashMap<PeerId, (VecDeque<ConnectionId>, VecDeque<ConnectionId>)>,
// Queue of completed swaps that we have assigned a connection handler to but where we haven't notified the ConnectionHandler yet
// We notify the ConnectionHandler by emitting a ConnectionHandlerEvent::NotifyBehaviour event
assigned_unnotified_swaps: VecDeque<(ConnectionId, PeerId, Uuid, NewSwap)>,
// Maintains the list of requests that we have sent to a connection handler but haven't yet received a response
inflight_requests: HashMap<ConnectionId, (Uuid, PeerId)>,
// Queue of swap setup results that we want to notify the Swarm about
to_swarm: VecDeque<SwapSetupResult>,
// Queue of peers that we want to instruct the Swarm to dial
to_dial: VecDeque<PeerId>,
} }
impl Behaviour { impl Behaviour {
@ -39,24 +63,53 @@ impl Behaviour {
env_config, env_config,
bitcoin_wallet, bitcoin_wallet,
new_swaps: VecDeque::default(), new_swaps: VecDeque::default(),
completed_swaps: VecDeque::default(), to_swarm: VecDeque::default(),
assigned_unnotified_swaps: VecDeque::default(),
inflight_requests: HashMap::default(),
connection_handlers: HashMap::default(),
to_dial: VecDeque::default(),
} }
} }
pub async fn start(&mut self, alice: PeerId, swap: NewSwap) { pub async fn start(&mut self, alice_peer_id: PeerId, swap: NewSwap) {
self.new_swaps.push_back((alice, swap)) tracing::trace!(
} %alice_peer_id,
} ?swap,
"Queuing new swap setup request inside the Behaviour",
);
impl From<Completed> for out_event::bob::OutEvent { // TODO: This is a bit redundant because we already have the swap_id in the NewSwap struct
fn from(completed: Completed) -> Self { self.new_swaps
out_event::bob::OutEvent::SwapSetupCompleted(Box::new(completed.0)) .push_back((alice_peer_id, swap.swap_id, swap));
self.to_dial.push_back(alice_peer_id);
}
// Returns a mutable reference to the queues of the connection handlers for a specific peer
fn connection_handlers_mut(
&mut self,
peer_id: PeerId,
) -> &mut (VecDeque<ConnectionId>, VecDeque<ConnectionId>) {
self.connection_handlers.entry(peer_id).or_default()
}
// Returns a mutable reference to the queues of the connection handlers for a specific peer
fn alive_connection_handlers_mut(&mut self, peer_id: PeerId) -> &mut VecDeque<ConnectionId> {
&mut self.connection_handlers_mut(peer_id).0
}
// Returns a mutable reference to the queues of the connection handlers for a specific peer
fn dead_connection_handlers_mut(&mut self, peer_id: PeerId) -> &mut VecDeque<ConnectionId> {
&mut self.connection_handlers_mut(peer_id).1
}
fn known_peers(&self) -> HashSet<PeerId> {
self.connection_handlers.keys().copied().collect()
} }
} }
impl NetworkBehaviour for Behaviour { impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler; type ConnectionHandler = Handler;
type ToSwarm = Completed; type ToSwarm = SwapSetupResult;
fn handle_established_inbound_connection( fn handle_established_inbound_connection(
&mut self, &mut self,
@ -78,17 +131,57 @@ impl NetworkBehaviour for Behaviour {
Ok(Handler::new(self.env_config, self.bitcoin_wallet.clone())) Ok(Handler::new(self.env_config, self.bitcoin_wallet.clone()))
} }
fn on_swarm_event(&mut self, _event: FromSwarm<'_>) { fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
// We do not need to handle swarm events match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
}) => {
tracing::trace!(
peer = %peer_id,
connection_id = %connection_id,
endpoint = ?endpoint,
"A new connection handler has been established",
);
self.alive_connection_handlers_mut(peer_id)
.push_back(connection_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
..
}) => {
tracing::trace!(
peer = %peer_id,
connection_id = %connection_id,
"A swap setup connection handler has died",
);
self.dead_connection_handlers_mut(peer_id)
.push_back(connection_id);
}
_ => {}
}
} }
fn on_connection_handler_event( fn on_connection_handler_event(
&mut self, &mut self,
peer_id: PeerId, event_peer_id: PeerId,
_connection_id: libp2p::swarm::ConnectionId, connection_id: libp2p::swarm::ConnectionId,
event: THandlerOutEvent<Self>, result: THandlerOutEvent<Self>,
) { ) {
self.completed_swaps.push_back((peer_id, event)); if let Some((swap_id, peer)) = self.inflight_requests.remove(&connection_id) {
assert_eq!(peer, event_peer_id);
self.to_swarm.push_back(SwapSetupResult {
peer,
swap_id,
result,
});
}
} }
fn poll( fn poll(
@ -96,19 +189,127 @@ impl NetworkBehaviour for Behaviour {
_cx: &mut std::task::Context<'_>, _cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Forward completed swaps from the connection handler to the swarm // Forward completed swaps from the connection handler to the swarm
if let Some((_peer, completed)) = self.completed_swaps.pop_front() { if let Some(completed) = self.to_swarm.pop_front() {
tracing::trace!(
peer = %completed.peer,
"Forwarding completed swap setup from Behaviour to the Swarm",
);
return Poll::Ready(ToSwarm::GenerateEvent(completed)); return Poll::Ready(ToSwarm::GenerateEvent(completed));
} }
// If there is a new swap to be started, send it to the connection handler // Forward any peers that we want to dial to the Swarm
if let Some((peer, event)) = self.new_swaps.pop_front() { if let Some(peer) = self.to_dial.pop_front() {
return Poll::Ready(ToSwarm::NotifyHandler { tracing::trace!(
peer_id: peer, peer = %peer,
handler: libp2p::swarm::NotifyHandler::Any, "Instructing swarm to dial a new connection handler for a swap setup request",
event, );
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(peer)
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
}); });
} }
// Remove any unused already dead connection handlers that were never assigned a request
for peer in self.known_peers() {
let (alive_connection_handlers, dead_connection_handlers) =
self.connection_handlers_mut(peer);
// Create sets for efficient lookup
let alive_set: HashSet<_> = alive_connection_handlers.iter().copied().collect();
let dead_set: HashSet<_> = dead_connection_handlers.iter().copied().collect();
// Remove from alive any handlers that are also in dead
alive_connection_handlers.retain(|id| !dead_set.contains(id));
// Remove from dead any handlers that were in alive (the overlap we just processed)
dead_connection_handlers.retain(|id| !alive_set.contains(id));
}
// Go through our new_swaps and try to assign a request to a connection handler
//
// If we find a connection handler for the peer, it will be removed from new_swaps
// If we don't find a connection handler for the peer, it will remain in new_swaps
{
let new_swaps = &mut self.new_swaps;
let connection_handlers = &mut self.connection_handlers;
let assigned_unnotified_swaps = &mut self.assigned_unnotified_swaps;
let mut remaining = std::collections::VecDeque::new();
for (peer, swap_id, new_swap) in new_swaps.drain(..) {
if let Some(connection_id) =
connection_handlers.entry(peer).or_default().0.pop_front()
{
assigned_unnotified_swaps.push_back((connection_id, peer, swap_id, new_swap));
} else {
remaining.push_back((peer, swap_id, new_swap));
}
}
*new_swaps = remaining;
}
// If a connection handler died which had an assigned swap setup request,
// we need to notify the swarm that the request failed
for peer_id in self.known_peers() {
while let Some(connection_id) = self.dead_connection_handlers_mut(peer_id).pop_front() {
if let Some((swap_id, _)) = self.inflight_requests.remove(&connection_id) {
self.to_swarm.push_back(SwapSetupResult {
peer: peer_id,
swap_id,
result: Err(anyhow::anyhow!("Connection handler for peer {} has died after we notified it of the swap setup request", peer_id)),
});
}
}
}
// Iterate through our assigned_unnotified_swaps queue (with popping)
if let Some((connection_id, peer_id, swap_id, new_swap)) =
self.assigned_unnotified_swaps.pop_front()
{
tracing::trace!(
swap_id = %swap_id,
connection_id = %connection_id,
?new_swap,
"Dispatching swap setup request from Behaviour to a specific connection handler",
);
// Check if the connection handler is still alive
if let Some(dead_connection_handler) = self
.dead_connection_handlers_mut(peer_id)
.iter()
.position(|id| *id == connection_id)
{
self.dead_connection_handlers_mut(peer_id)
.remove(dead_connection_handler);
self.to_swarm.push_back(SwapSetupResult {
peer: peer_id,
swap_id,
result: Err(anyhow::anyhow!("Connection handler for peer {} has died before we could notify it of the swap setup request", peer_id)),
});
} else {
// ConnectionHandler must still be alive, notify it of the swap setup request
tracing::trace!(
peer = %peer_id,
swap_id = %swap_id,
?new_swap,
"Notifying connection handler of the swap setup request. We are assuming it is still alive.",
);
self.inflight_requests
.insert(connection_id, (swap_id, peer_id));
return Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler: libp2p::swarm::NotifyHandler::One(connection_id),
event: new_swap,
});
}
}
Poll::Pending Poll::Pending
} }
} }
@ -132,6 +333,8 @@ impl Handler {
timeout: Duration::from_secs(120), timeout: Duration::from_secs(120),
new_swaps: VecDeque::default(), new_swaps: VecDeque::default(),
bitcoin_wallet, bitcoin_wallet,
// TODO: This will keep ALL connections alive indefinitely
// which is not optimal
keep_alive: true, keep_alive: true,
} }
} }
@ -148,11 +351,15 @@ pub struct NewSwap {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Completed(Result<State2>); pub struct SwapSetupResult {
peer: PeerId,
swap_id: Uuid,
result: Result<State2>,
}
impl ConnectionHandler for Handler { impl ConnectionHandler for Handler {
type FromBehaviour = NewSwap; type FromBehaviour = NewSwap;
type ToBehaviour = Completed; type ToBehaviour = Result<State2>;
type InboundProtocol = upgrade::DeniedUpgrade; type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = protocol::SwapSetup; type OutboundProtocol = protocol::SwapSetup;
type InboundOpenInfo = (); type InboundOpenInfo = ();
@ -175,7 +382,7 @@ impl ConnectionHandler for Handler {
) { ) {
match event { match event {
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_) => { libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_) => {
unreachable!("Bob does not support inbound substreams") // TODO: Maybe warn here as Bob does not support inbound substreams?
} }
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(outbound) => { libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
let mut substream = outbound.protocol; let mut substream = outbound.protocol;
@ -185,7 +392,104 @@ impl ConnectionHandler for Handler {
let env_config = self.env_config; let env_config = self.env_config;
let protocol = tokio::time::timeout(self.timeout, async move { let protocol = tokio::time::timeout(self.timeout, async move {
let result = async { let result = run_swap_setup(
&mut substream,
new_swap_request,
env_config,
bitcoin_wallet,
)
.await;
result.map_err(|err: anyhow::Error| {
tracing::error!(?err, "Error occurred during swap setup protocol");
Error::Protocol(format!("{:?}", err))
})
});
let max_seconds = self.timeout.as_secs();
self.outbound_stream = OptionFuture::from(Some(Box::pin(async move {
protocol.await.map_err(|_| Error::Timeout {
seconds: max_seconds,
})?
})
as OutboundStream));
}
libp2p::swarm::handler::ConnectionEvent::AddressChange(address_change) => {
tracing::trace!(
?address_change,
"Connection address changed during swap setup"
);
}
libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
tracing::trace!(error = %dial_upgrade_error.error, "Dial upgrade error during swap setup");
}
libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
tracing::trace!(
?listen_upgrade_error,
"Listen upgrade error during swap setup"
);
}
_ => {
// We ignore the rest of events
}
}
}
fn on_behaviour_event(&mut self, new_swap: Self::FromBehaviour) {
self.new_swaps.push_back(new_swap);
}
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Check if there is a new swap to be started on this connection
// Has the Behaviour assigned us a new swap to be started on this connection?
if let Some(new_swap) = self.new_swaps.pop_front() {
tracing::trace!(
?new_swap.swap_id,
"Instructing swarm to start a new outbound substream as part of swap setup",
);
// Keep the connection alive because we want to use it
self.keep_alive = true;
// We instruct the swarm to start a new outbound substream
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(protocol::new(), new_swap),
});
}
// Check if the outbound stream has completed
if let Poll::Ready(Some(result)) = self.outbound_stream.poll_unpin(cx) {
self.outbound_stream = None.into();
// Once the outbound stream is completed, we no longer keep the connection alive
self.keep_alive = false;
// We notify the swarm that the swap setup is completed / failed
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
result.map_err(anyhow::Error::from).into(),
));
}
Poll::Pending
}
}
async fn run_swap_setup(
mut substream: &mut libp2p::swarm::Stream,
new_swap_request: NewSwap,
env_config: env::Config,
bitcoin_wallet: Arc<dyn BitcoinWallet>,
) -> Result<State2> {
// Here we request the spot price from Alice // Here we request the spot price from Alice
write_cbor_message( write_cbor_message(
&mut substream, &mut substream,
@ -210,6 +514,13 @@ impl ConnectionHandler for Handler {
.context("Failed to read spot price response from Alice")?, .context("Failed to read spot price response from Alice")?,
)?; )?;
tracing::trace!(
%new_swap_request.swap_id,
xmr = %xmr,
btc = %new_swap_request.btc,
"Got spot price response from Alice as part of swap setup",
);
let state0 = State0::new( let state0 = State0::new(
new_swap_request.swap_id, new_swap_request.swap_id,
&mut rand::thread_rng(), &mut rand::thread_rng(),
@ -224,6 +535,11 @@ impl ConnectionHandler for Handler {
new_swap_request.tx_lock_fee, new_swap_request.tx_lock_fee,
); );
tracing::trace!(
%new_swap_request.swap_id,
"Transitioned into state0 during swap setup",
);
write_cbor_message(&mut substream, state0.next_message()) write_cbor_message(&mut substream, state0.next_message())
.await .await
.context("Failed to send state0 message to Alice")?; .context("Failed to send state0 message to Alice")?;
@ -234,6 +550,12 @@ impl ConnectionHandler for Handler {
.receive(bitcoin_wallet.as_ref(), message1) .receive(bitcoin_wallet.as_ref(), message1)
.await .await
.context("Failed to receive state1")?; .context("Failed to receive state1")?;
tracing::trace!(
%new_swap_request.swap_id,
"Transitioned into state1 during swap setup",
);
write_cbor_message(&mut substream, state1.next_message()) write_cbor_message(&mut substream, state1.next_message())
.await .await
.context("Failed to send state1 message")?; .context("Failed to send state1 message")?;
@ -244,6 +566,11 @@ impl ConnectionHandler for Handler {
.receive(message3) .receive(message3)
.context("Failed to receive state2")?; .context("Failed to receive state2")?;
tracing::trace!(
%new_swap_request.swap_id,
"Transitioned into state2 during swap setup",
);
write_cbor_message(&mut substream, state2.next_message()) write_cbor_message(&mut substream, state2.next_message())
.await .await
.context("Failed to send state2 message")?; .context("Failed to send state2 message")?;
@ -257,71 +584,12 @@ impl ConnectionHandler for Handler {
.await .await
.context("Failed to close substream")?; .context("Failed to close substream")?;
tracing::trace!(
%new_swap_request.swap_id,
"Swap setup completed",
);
Ok(state2) Ok(state2)
}
.await;
result.map_err(|e: anyhow::Error| {
tracing::error!("Error occurred during swap setup protocol: {:#}", e);
Error::Other
})
});
let max_seconds = self.timeout.as_secs();
self.outbound_stream = OptionFuture::from(Some(Box::pin(async move {
protocol.await.map_err(|_| Error::Timeout {
seconds: max_seconds,
})?
})
as OutboundStream));
// Once the outbound stream is created, we keep the connection alive
self.keep_alive = true;
}
_ => {}
}
}
fn on_behaviour_event(&mut self, new_swap: Self::FromBehaviour) {
self.new_swaps.push_back(new_swap);
}
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
// Check if there is a new swap to be started
if let Some(new_swap) = self.new_swaps.pop_front() {
self.keep_alive = true;
// We instruct the swarm to start a new outbound substream
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(protocol::new(), new_swap),
});
}
// Check if the outbound stream has completed
if let Poll::Ready(Some(result)) = self.outbound_stream.poll_unpin(cx) {
self.outbound_stream = None.into();
// Once the outbound stream is completed, we no longer keep the connection alive
self.keep_alive = false;
// We notify the swarm that the swap setup is completed / failed
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Completed(
result.map_err(anyhow::Error::from),
)));
}
Poll::Pending
}
} }
impl From<SpotPriceResponse> for Result<swap_core::monero::Amount, Error> { impl From<SpotPriceResponse> for Result<swap_core::monero::Amount, Error> {
@ -359,6 +627,11 @@ pub enum Error {
#[error("Failed to complete swap setup within {seconds}s")] #[error("Failed to complete swap setup within {seconds}s")]
Timeout { seconds: u64 }, Timeout { seconds: u64 },
/// Something went wrong during the swap setup protocol that is not covered by the other errors
/// but where we have some context about the error
#[error("Something went wrong during the swap setup protocol: {0}")]
Protocol(String),
/// To be used for errors that cannot be explained on the CLI side (e.g. /// To be used for errors that cannot be explained on the CLI side (e.g.
/// rate update problems on the seller side) /// rate update problems on the seller side)
#[error("Seller encountered a problem, please try again later.")] #[error("Seller encountered a problem, please try again later.")]
@ -383,3 +656,13 @@ impl From<SpotPriceError> for Error {
} }
} }
} }
impl From<SwapSetupResult> for out_event::bob::OutEvent {
fn from(completed: SwapSetupResult) -> Self {
out_event::bob::OutEvent::SwapSetupCompleted {
result: Box::new(completed.result),
swap_id: completed.swap_id,
peer: completed.peer,
}
}
}

View file

@ -9,7 +9,7 @@ pub mod watcher;
pub use behaviour::{Behaviour, OutEvent}; pub use behaviour::{Behaviour, OutEvent};
pub use cancel_and_refund::{cancel, cancel_and_refund, refund}; pub use cancel_and_refund::{cancel, cancel_and_refund, refund};
pub use event_loop::{EventLoop, EventLoopHandle}; pub use event_loop::{EventLoop, EventLoopHandle, SwapEventLoopHandle};
pub use list_sellers::{list_sellers, SellerStatus}; pub use list_sellers::{list_sellers, SellerStatus};
#[cfg(test)] #[cfg(test)]

View file

@ -1106,7 +1106,6 @@ pub async fn buy_xmr(
.await?; .await?;
let behaviour = cli::Behaviour::new( let behaviour = cli::Behaviour::new(
seller_peer_id,
env_config, env_config,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
(seed.derive_libp2p_identity(), namespace), (seed.derive_libp2p_identity(), namespace),
@ -1131,9 +1130,7 @@ pub async fn buy_xmr(
TauriSwapProgressEvent::ReceivedQuote(quote.clone()), TauriSwapProgressEvent::ReceivedQuote(quote.clone()),
); );
// Now create the event loop we use for the swap let (event_loop, mut event_loop_handle) = EventLoop::new(swarm, db.clone())?;
let (event_loop, event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id, db.clone())?;
let event_loop = tokio::spawn(event_loop.run().in_current_span()); let event_loop = tokio::spawn(event_loop.run().in_current_span());
tauri_handle.emit_swap_progress_event(swap_id, TauriSwapProgressEvent::ReceivedQuote(quote)); tauri_handle.emit_swap_progress_event(swap_id, TauriSwapProgressEvent::ReceivedQuote(quote));
@ -1161,13 +1158,14 @@ pub async fn buy_xmr(
} }
}, },
swap_result = async { swap_result = async {
let swap_event_loop_handle = event_loop_handle.swap_handle(seller_peer_id, swap_id).await?;
let swap = Swap::new( let swap = Swap::new(
db.clone(), db.clone(),
swap_id, swap_id,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
monero_wallet, monero_wallet,
env_config, env_config,
event_loop_handle, swap_event_loop_handle,
monero_receive_pool.clone(), monero_receive_pool.clone(),
bitcoin_change_address_for_spawn, bitcoin_change_address_for_spawn,
tx_lock_amount, tx_lock_amount,
@ -1225,7 +1223,6 @@ pub async fn resume_swap(
.derive_libp2p_identity(); .derive_libp2p_identity();
let behaviour = cli::Behaviour::new( let behaviour = cli::Behaviour::new(
seller_peer_id,
config.env_config, config.env_config,
bitcoin_wallet.clone(), bitcoin_wallet.clone(),
(seed.clone(), config.namespace), (seed.clone(), config.namespace),
@ -1240,20 +1237,22 @@ pub async fn resume_swap(
swarm.add_peer_address(seller_peer_id, seller_address); swarm.add_peer_address(seller_peer_id, seller_address);
} }
let (event_loop, event_loop_handle) = let (event_loop, mut event_loop_handle) = EventLoop::new(swarm, db.clone())?;
EventLoop::new(swap_id, swarm, seller_peer_id, db.clone())?;
let monero_receive_pool = db.get_monero_address_pool(swap_id).await?; let monero_receive_pool = db.get_monero_address_pool(swap_id).await?;
let tauri_handle = context.tauri_handle.clone(); let tauri_handle = context.tauri_handle.clone();
let swap_event_loop_handle = event_loop_handle
.swap_handle(seller_peer_id, swap_id)
.await?;
let swap = Swap::from_db( let swap = Swap::from_db(
db.clone(), db.clone(),
swap_id, swap_id,
bitcoin_wallet, bitcoin_wallet,
monero_manager, monero_manager,
config.env_config, config.env_config,
event_loop_handle, swap_event_loop_handle,
monero_receive_pool, monero_receive_pool,
) )
.await? .await?

View file

@ -6,7 +6,7 @@ use crate::network::{
use anyhow::Result; use anyhow::Result;
use bitcoin_wallet::BitcoinWallet; use bitcoin_wallet::BitcoinWallet;
use libp2p::swarm::NetworkBehaviour; use libp2p::swarm::NetworkBehaviour;
use libp2p::{identify, identity, ping, PeerId}; use libp2p::{identify, identity, ping};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use swap_env::env; use swap_env::env;
@ -38,7 +38,6 @@ pub struct Behaviour {
impl Behaviour { impl Behaviour {
pub fn new( pub fn new(
alice: PeerId,
env_config: env::Config, env_config: env::Config,
bitcoin_wallet: Arc<dyn BitcoinWallet>, bitcoin_wallet: Arc<dyn BitcoinWallet>,
identify_params: (identity::Keypair, XmrBtcNamespace), identify_params: (identity::Keypair, XmrBtcNamespace),
@ -57,7 +56,7 @@ impl Behaviour {
transfer_proof: transfer_proof::bob(), transfer_proof: transfer_proof::bob(),
encrypted_signature: encrypted_signature::bob(), encrypted_signature: encrypted_signature::bob(),
cooperative_xmr_redeem: cooperative_xmr_redeem_after_punish::bob(), cooperative_xmr_redeem: cooperative_xmr_redeem_after_punish::bob(),
redial: redial::Behaviour::new(alice, INITIAL_REDIAL_INTERVAL, MAX_REDIAL_INTERVAL), redial: redial::Behaviour::new(INITIAL_REDIAL_INTERVAL, MAX_REDIAL_INTERVAL),
ping: ping::Behaviour::new(pingConfig), ping: ping::Behaviour::new(pingConfig),
identify: identify::Behaviour::new(identifyConfig), identify: identify::Behaviour::new(identifyConfig),
} }

View file

@ -7,17 +7,18 @@ use crate::network::swap_setup::bob::NewSwap;
use crate::protocol::bob::swap::has_already_processed_transfer_proof; use crate::protocol::bob::swap::has_already_processed_transfer_proof;
use crate::protocol::bob::{BobState, State2}; use crate::protocol::bob::{BobState, State2};
use crate::protocol::Database; use crate::protocol::Database;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use futures::future::{BoxFuture, OptionFuture}; use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use libp2p::request_response::{OutboundFailure, OutboundRequestId, ResponseChannel}; use libp2p::request_response::{OutboundFailure, OutboundRequestId, ResponseChannel};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use swap_core::bitcoin::EncryptedSignature; use swap_core::bitcoin::EncryptedSignature;
use swap_p2p::protocols::redial;
use uuid::Uuid; use uuid::Uuid;
static REQUEST_RESPONSE_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(60); static REQUEST_RESPONSE_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(60);
@ -25,199 +26,238 @@ static EXECUTION_SETUP_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(120);
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swap_id: Uuid,
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
alice_peer_id: PeerId,
db: Arc<dyn Database + Send + Sync>, db: Arc<dyn Database + Send + Sync>,
// These streams represents outgoing requests that we have to make // When a new `SwapEventLoopHandle` is created:
// These are essentially queues of requests that we will send to Alice once we are connected to her. // 1. a channel is created for the EventLoop to send transfer_proofs to SwapEventLoopHandle
quote_requests: bmrng::RequestReceiverStream<(), Result<BidQuote, OutboundFailure>>, // 2. the corresponding PeerId of Alice is stored
cooperative_xmr_redeem_requests: bmrng::RequestReceiverStream< //
// The sender of the channel is sent into this queue. The receiver is stored in the `SwapEventLoopHandle`.
//
// This is polled and then moved into `registered_swap_handlers`
queued_swap_handlers: bmrng::unbounded::UnboundedRequestReceiverStream<
(
Uuid,
PeerId,
bmrng::unbounded::UnboundedRequestSender<monero::TransferProof, ()>,
),
(), (),
>,
registered_swap_handlers: HashMap<
Uuid,
(
PeerId,
bmrng::unbounded::UnboundedRequestSender<monero::TransferProof, ()>,
),
>,
// These streams represents outgoing requests that we have to make (queues)
//
// Requests are keyed by the PeerId because they do not correspond to an existing swap yet
quote_requests:
bmrng::unbounded::UnboundedRequestReceiverStream<PeerId, Result<BidQuote, OutboundFailure>>,
// TODO: technically NewSwap.swap_id already contains the id of the swap
execution_setup_requests:
bmrng::unbounded::UnboundedRequestReceiverStream<(PeerId, NewSwap), Result<State2>>,
// These streams represents outgoing requests that we have to make (queues)
//
// Requests are keyed by the swap_id because they correspond to a specific swap
cooperative_xmr_redeem_requests: bmrng::unbounded::UnboundedRequestReceiverStream<
(PeerId, Uuid),
Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>, Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>,
>, >,
encrypted_signatures_requests: encrypted_signatures_requests: bmrng::unbounded::UnboundedRequestReceiverStream<
bmrng::RequestReceiverStream<EncryptedSignature, Result<(), OutboundFailure>>, (PeerId, Uuid, EncryptedSignature),
execution_setup_requests: bmrng::RequestReceiverStream<NewSwap, Result<State2>>, Result<(), OutboundFailure>,
>,
// These represents requests that are currently in-flight. // These represents requests that are currently in-flight.
// Meaning that we have sent them to Alice, but we have not yet received a response. // Meaning that we have sent them to Alice, but we have not yet received a response.
// Once we get a response to a matching [`RequestId`], we will use the responder to relay the // Once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response. // response.
inflight_quote_requests: inflight_quote_requests: HashMap<
HashMap<OutboundRequestId, bmrng::Responder<Result<BidQuote, OutboundFailure>>>, OutboundRequestId,
inflight_encrypted_signature_requests: bmrng::unbounded::UnboundedResponder<Result<BidQuote, OutboundFailure>>,
HashMap<OutboundRequestId, bmrng::Responder<Result<(), OutboundFailure>>>, >,
inflight_swap_setup: Option<bmrng::Responder<Result<State2>>>, inflight_encrypted_signature_requests: HashMap<
OutboundRequestId,
bmrng::unbounded::UnboundedResponder<Result<(), OutboundFailure>>,
>,
inflight_swap_setup:
HashMap<(PeerId, Uuid), bmrng::unbounded::UnboundedResponder<Result<State2>>>,
inflight_cooperative_xmr_redeem_requests: HashMap< inflight_cooperative_xmr_redeem_requests: HashMap<
OutboundRequestId, OutboundRequestId,
bmrng::Responder<Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>>, bmrng::unbounded::UnboundedResponder<
Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>,
>,
>, >,
/// The sender we will use to relay incoming transfer proofs to the EventLoopHandle /// The future representing the successful handling of an incoming transfer proof (by the state machine)
/// The corresponding receiver is stored in the EventLoopHandle
transfer_proof_sender: bmrng::RequestSender<monero::TransferProof, ()>,
/// The future representing the successful handling of an incoming transfer
/// proof.
/// ///
/// Once we've sent a transfer proof to the ongoing swap, this future waits /// Once we've sent a transfer proof to the ongoing swap, a future is inserted into this set
/// until the swap took it "out" of the `EventLoopHandle`. As this future /// which will resolve once the state machine has "processed" the transfer proof.
/// resolves, we use the `ResponseChannel` returned from it to send an ACK ///
/// to Alice that we have successfully processed the transfer proof. /// The future will yield the swap_id and the response channel which are used to send an acknowledgement to Alice.
pending_transfer_proof: OptionFuture<BoxFuture<'static, ResponseChannel<()>>>, pending_transfer_proof_acks: FuturesUnordered<BoxFuture<'static, (Uuid, ResponseChannel<()>)>>,
} }
impl EventLoop { impl EventLoop {
fn swap_peer_id(&self, swap_id: &Uuid) -> Option<PeerId> {
self.registered_swap_handlers
.get(swap_id)
.map(|(peer_id, _)| *peer_id)
}
pub fn new( pub fn new(
swap_id: Uuid,
swarm: Swarm<Behaviour>, swarm: Swarm<Behaviour>,
alice_peer_id: PeerId,
db: Arc<dyn Database + Send + Sync>, db: Arc<dyn Database + Send + Sync>,
) -> Result<(Self, EventLoopHandle)> { ) -> Result<(Self, EventLoopHandle)> {
// We still use a timeout here, because this protocol does not dial Alice itself // We still use a timeout here because we trust our own implementation of the swap setup protocol less than the libp2p library
// and we want to fail if we cannot reach Alice
let (execution_setup_sender, execution_setup_receiver) = let (execution_setup_sender, execution_setup_receiver) =
bmrng::channel_with_timeout(1, EXECUTION_SETUP_PROTOCOL_TIMEOUT); bmrng::unbounded::channel_with_timeout(EXECUTION_SETUP_PROTOCOL_TIMEOUT);
// It is okay to not have a timeout here, as timeouts are enforced by the request-response protocol // It is okay to not have a timeout here, as timeouts are enforced by the request-response protocol
let (transfer_proof_sender, transfer_proof_receiver) = bmrng::channel(1); let (encrypted_signature_sender, encrypted_signature_receiver) =
let (encrypted_signature_sender, encrypted_signature_receiver) = bmrng::channel(1); bmrng::unbounded::channel();
let (quote_sender, quote_receiver) = bmrng::channel(1); let (quote_sender, quote_receiver) = bmrng::unbounded::channel();
let (cooperative_xmr_redeem_sender, cooperative_xmr_redeem_receiver) = bmrng::channel(1); let (cooperative_xmr_redeem_sender, cooperative_xmr_redeem_receiver) =
bmrng::unbounded::channel();
let (queued_transfer_proof_sender, queued_transfer_proof_receiver) =
bmrng::unbounded::channel();
let event_loop = EventLoop { let event_loop = EventLoop {
swap_id,
swarm, swarm,
alice_peer_id, db,
queued_swap_handlers: queued_transfer_proof_receiver.into(),
registered_swap_handlers: HashMap::default(),
execution_setup_requests: execution_setup_receiver.into(), execution_setup_requests: execution_setup_receiver.into(),
transfer_proof_sender,
encrypted_signatures_requests: encrypted_signature_receiver.into(), encrypted_signatures_requests: encrypted_signature_receiver.into(),
cooperative_xmr_redeem_requests: cooperative_xmr_redeem_receiver.into(), cooperative_xmr_redeem_requests: cooperative_xmr_redeem_receiver.into(),
quote_requests: quote_receiver.into(), quote_requests: quote_receiver.into(),
inflight_quote_requests: HashMap::default(), inflight_quote_requests: HashMap::default(),
inflight_swap_setup: None, inflight_swap_setup: HashMap::default(),
inflight_encrypted_signature_requests: HashMap::default(), inflight_encrypted_signature_requests: HashMap::default(),
inflight_cooperative_xmr_redeem_requests: HashMap::default(), inflight_cooperative_xmr_redeem_requests: HashMap::default(),
pending_transfer_proof: OptionFuture::from(None), pending_transfer_proof_acks: FuturesUnordered::new(),
db,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
execution_setup_sender, execution_setup_sender,
transfer_proof_receiver,
encrypted_signature_sender, encrypted_signature_sender,
cooperative_xmr_redeem_sender, cooperative_xmr_redeem_sender,
quote_sender, quote_sender,
queued_transfer_proof_sender,
}; };
Ok((event_loop, handle)) Ok((event_loop, handle))
} }
pub async fn run(mut self) { pub async fn run(mut self) {
match self.swarm.dial(DialOpts::from(self.alice_peer_id)) {
Ok(()) => {}
Err(e) => {
tracing::error!("Failed to initiate dial to Alice: {:?}", e);
return;
}
}
loop { loop {
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html // Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
tokio::select! { tokio::select! {
swarm_event = self.swarm.select_next_some() => { swarm_event = self.swarm.select_next_some() => {
match swarm_event { match swarm_event {
SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
tracing::trace!(
%id,
"Received quote"
);
if let Some(responder) = self.inflight_quote_requests.remove(&id) { if let Some(responder) = self.inflight_quote_requests.remove(&id) {
let _ = responder.respond(Ok(response)); let _ = responder.respond(Ok(response));
} }
} }
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => { SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted { peer, swap_id, result }) => {
if let Some(responder) = self.inflight_swap_setup.take() { tracing::trace!(
let _ = responder.respond(*response); %peer,
"Processing swap setup completion"
);
if let Some(responder) = self.inflight_swap_setup.remove(&(peer, swap_id)) {
let _ = responder.respond(*result);
} }
} }
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
tracing::trace!(
%peer,
%msg.swap_id,
"Received transfer proof"
);
let swap_id = msg.swap_id; let swap_id = msg.swap_id;
if swap_id == self.swap_id { // Check if we have a registered handler for this swap
if peer != self.alice_peer_id { if let Some((expected_peer_id, sender)) = self.registered_swap_handlers.get(&swap_id) {
// Ensure the transfer proof is coming from the expected peer
if peer != *expected_peer_id {
tracing::warn!( tracing::warn!(
%swap_id, %swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}", "Ignoring malicious transfer proof from {}, expected to receive it from {}",
peer, peer,
self.alice_peer_id); expected_peer_id);
continue;
}
// Send the transfer proof to the registered handler
match sender.send(msg.tx_lock_proof) {
Ok(mut responder) => {
// Insert a future that will resolve when the handle "takes the transfer proof out"
self.pending_transfer_proof_acks.push(async move {
let _ = responder.recv().await;
(swap_id, channel)
}.boxed());
}
Err(e) => {
tracing::warn!(
%swap_id,
%peer,
error = ?e,
"Failed to pass transfer proof to registered handler"
);
}
}
continue; continue;
} }
// Immediately acknowledge if we've already processed this transfer proof // Immediately acknowledge if we've already processed this transfer proof
// This handles the case where Alice didn't receive our previous acknowledgment // This handles the case where Alice didn't receive our previous acknowledgment
// and is retrying sending the transfer proof // and is retrying sending the transfer proof
if let Ok(state) = self.db.get_state(swap_id).await { match should_acknowledge_transfer_proof(self.db.clone(), swap_id, peer).await {
let state: BobState = state.try_into() Ok(true) => {
.expect("Bobs database only contains Bob states");
if has_already_processed_transfer_proof(&state) {
tracing::warn!("Received transfer proof for swap {} but we are already in state {}. Acknowledging immediately. Alice most likely did not receive the acknowledgment when we sent it before", swap_id, state);
// We set this to a future that will resolve immediately, and returns the channel // We set this to a future that will resolve immediately, and returns the channel
// This will be resolved in the next iteration of the event loop, and a response will be sent to Alice // This will be resolved in the next iteration of the event loop, and a response will be sent to Alice
self.pending_transfer_proof = OptionFuture::from(Some(async move { self.pending_transfer_proof_acks.push(async move {
channel (swap_id, channel)
}.boxed())); }.boxed());
// Skip evaluation of whether we should buffer the transfer proof
// if we already acknowledged the transfer proof
continue; continue;
} }
} // TODO: Maybe we should log here?
Ok(false) => {}
let mut responder = match self.transfer_proof_sender.send(msg.tx_lock_proof).await { Err(error) => {
Ok(responder) => responder,
Err(e) => {
tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue;
}
};
self.pending_transfer_proof = OptionFuture::from(Some(async move {
let _ = responder.recv().await;
channel
}.boxed()));
}else {
// Check if the transfer proof is sent from the correct peer and if we have a record of the swap
match self.db.get_peer_id(swap_id).await {
// We have a record of the swap
Ok(buffer_swap_alice_peer_id) => {
if buffer_swap_alice_peer_id == self.alice_peer_id {
// Save transfer proof in the database such that we can process it later when we resume the swap
match self.db.insert_buffered_transfer_proof(swap_id, msg.tx_lock_proof).await {
Ok(_) => {
tracing::info!("Received transfer proof for swap {} while running swap {}. Buffering this transfer proof in the database for later retrieval", swap_id, self.swap_id);
let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ());
}
Err(e) => {
tracing::error!("Failed to buffer transfer proof for swap {}: {:#}", swap_id, e);
}
};
}else {
tracing::warn!( tracing::warn!(
%swap_id, %swap_id,
"Ignoring malicious transfer proof from {}, expected to receive it from {}", %peer,
self.swap_id, error = ?error,
buffer_swap_alice_peer_id); "Failed to evaluate if we should acknowledge the transfer proof, we will not respond at all"
} );
},
// We do not have a record of the swap or an error occurred while retrieving the peer id of Alice
Err(e) => {
if let Some(sqlx::Error::RowNotFound) = e.downcast_ref::<sqlx::Error>() {
tracing::warn!("Ignoring transfer proof for swap {} while running swap {}. We do not have a record of this swap", swap_id, self.swap_id);
} else {
tracing::error!("Ignoring transfer proof for swap {} while running swap {}. Failed to retrieve the peer id of Alice for the corresponding swap: {:#}", swap_id, self.swap_id, e);
}
} }
} }
// Check if we should buffer the transfer proof
if let Err(error) = buffer_transfer_proof_if_needed(self.db.clone(), swap_id, peer, msg.tx_lock_proof).await {
tracing::warn!(
%swap_id,
%peer,
error = ?error,
"Failed to buffer transfer proof"
);
} }
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => { SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => {
@ -239,30 +279,21 @@ impl EventLoop {
tracing::warn!(%peer, err = ?error, "Communication error"); tracing::warn!(%peer, err = ?error, "Communication error");
return; return;
} }
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { SwarmEvent::ConnectionEstablished { peer_id: _, endpoint, .. } => {
tracing::info!(peer_id = %endpoint.get_remote_address(), "Connected to Alice"); tracing::info!(peer_id = %endpoint.get_remote_address(), "Connected to peer");
} }
SwarmEvent::Dialing { peer_id: Some(alice_peer_id), connection_id } if alice_peer_id == self.alice_peer_id => { SwarmEvent::Dialing { peer_id: Some(peer_id), connection_id } => {
tracing::debug!(%alice_peer_id, %connection_id, "Dialing Alice"); tracing::debug!(%peer_id, %connection_id, "Dialing peer");
} }
SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error), connection_id } if peer_id == self.alice_peer_id && num_established == 0 => { SwarmEvent::ConnectionClosed { peer_id: _, endpoint, num_established, cause: Some(error), connection_id } if num_established == 0 => {
tracing::warn!(peer_id = %endpoint.get_remote_address(), cause = ?error, %connection_id, "Lost connection to Alice"); tracing::warn!(peer_id = %endpoint.get_remote_address(), cause = ?error, %connection_id, "Lost connection to peer");
if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() {
tracing::info!(seconds_until_next_redial = %duration.as_secs(), "Waiting for next redial attempt");
} }
} SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if num_established == 0 => {
SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => {
// no error means the disconnection was requested // no error means the disconnection was requested
tracing::info!("Successfully closed connection to Alice"); tracing::info!(%peer_id, "Successfully closed connection to peer");
return;
}
SwarmEvent::OutgoingConnectionError { peer_id: Some(alice_peer_id), error, connection_id } if alice_peer_id == self.alice_peer_id => {
tracing::warn!(%alice_peer_id, %connection_id, ?error, "Failed to connect to Alice");
if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() {
tracing::info!(seconds_until_next_redial = %duration.as_secs(), "Waiting for next redial attempt");
} }
SwarmEvent::OutgoingConnectionError { peer_id: Some(peer_id), error, connection_id } => {
tracing::warn!(%peer_id, %connection_id, ?error, "Outgoing connection error to peer");
} }
SwarmEvent::Behaviour(OutEvent::OutboundRequestResponseFailure {peer, error, request_id, protocol}) => { SwarmEvent::Behaviour(OutEvent::OutboundRequestResponseFailure {peer, error, request_id, protocol}) => {
tracing::error!( tracing::error!(
@ -299,98 +330,150 @@ impl EventLoop {
%request_id, %request_id,
?error, ?error,
%protocol, %protocol,
"Failed to receive request-response request from peer"); "Failed to receive or send response for request-response request from peer");
}
SwarmEvent::Behaviour(OutEvent::Redial(redial::Event::ScheduledRedial { peer, next_dial_in })) => {
tracing::trace!(
%peer,
seconds_until_next_redial = %next_dial_in.as_secs(),
"Scheduled redial for peer"
);
} }
_ => {} _ => {}
} }
}, },
// Handle to-be-sent outgoing requests for all our network protocols. // Handle to-be-sent outgoing requests for all our network protocols.
Some(((), responder)) = self.quote_requests.next().fuse() => { Some((peer_id, responder)) = self.quote_requests.next().fuse() => {
let id = self.swarm.behaviour_mut().quote.send_request(&self.alice_peer_id, ()); let outbound_request_id = self.swarm.behaviour_mut().quote.send_request(&peer_id, ());
self.inflight_quote_requests.insert(id, responder); self.inflight_quote_requests.insert(outbound_request_id, responder);
tracing::trace!(
%peer_id,
%outbound_request_id,
"Dispatching outgoing quote request"
);
}, },
Some((tx_redeem_encsig, responder)) = self.encrypted_signatures_requests.next().fuse() => { Some(((peer_id, swap_id, tx_redeem_encsig), responder)) = self.encrypted_signatures_requests.next().fuse() => {
let request = encrypted_signature::Request { let request = encrypted_signature::Request {
swap_id: self.swap_id, swap_id,
tx_redeem_encsig tx_redeem_encsig
}; };
let id = self.swarm.behaviour_mut().encrypted_signature.send_request(&self.alice_peer_id, request); let outbound_request_id = self.swarm.behaviour_mut().encrypted_signature.send_request(&peer_id, request);
self.inflight_encrypted_signature_requests.insert(id, responder); self.inflight_encrypted_signature_requests.insert(outbound_request_id, responder);
tracing::trace!(
%peer_id,
%swap_id,
%outbound_request_id,
"Dispatching outgoing encrypted signature"
);
}, },
Some((_, responder)) = self.cooperative_xmr_redeem_requests.next().fuse() => { Some(((peer_id, swap_id), responder)) = self.cooperative_xmr_redeem_requests.next().fuse() => {
let id = self.swarm.behaviour_mut().cooperative_xmr_redeem.send_request(&self.alice_peer_id, Request { let outbound_request_id = self.swarm.behaviour_mut().cooperative_xmr_redeem.send_request(&peer_id, Request {
swap_id: self.swap_id swap_id
}); });
self.inflight_cooperative_xmr_redeem_requests.insert(id, responder); self.inflight_cooperative_xmr_redeem_requests.insert(outbound_request_id, responder);
tracing::trace!(
%peer_id,
%swap_id,
%outbound_request_id,
"Dispatching outgoing cooperative xmr redeem request"
);
}, },
// We use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected. // Instruct the swap setup behaviour to do a swap setup request
// because the protocol does not dial Alice itself // The behaviour will instruct the swarm to dial Alice, so we don't need to check if we are connected
// (unlike request-response above) Some(((alice_peer_id, swap), responder)) = self.execution_setup_requests.next().fuse() => {
Some((swap, responder)) = self.execution_setup_requests.next().fuse(), if self.is_connected_to_alice() => { let swap_id = swap.swap_id.clone();
self.swarm.behaviour_mut().swap_setup.start(self.alice_peer_id, swap).await;
self.inflight_swap_setup = Some(responder);
},
self.swarm.behaviour_mut().swap_setup.start(alice_peer_id, swap).await;
self.inflight_swap_setup.insert((alice_peer_id, swap_id), responder);
tracing::trace!(
%alice_peer_id,
"Dispatching outgoing execution setup request"
);
},
// Send an acknowledgement to Alice once the EventLoopHandle has processed a received transfer proof // Send an acknowledgement to Alice once the EventLoopHandle has processed a received transfer proof
// We use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected. Some((swap_id, response_channel)) = self.pending_transfer_proof_acks.next() => {
// tracing::trace!(
// Why do we do this here but not for the other request-response channels? %swap_id,
// This is the only request, we don't have a retry mechanism for. We lazily send this. "Dispatching outgoing transfer proof acknowledgment");
Some(response_channel) = &mut self.pending_transfer_proof, if self.is_connected_to_alice() => {
// We do not check if we are connected to Alice here because responding on a channel
// which has been dropped works even if a new connections has been established since
// will not work because because a channel is always bounded to one connection
if self.swarm.behaviour_mut().transfer_proof.send_response(response_channel, ()).is_err() { if self.swarm.behaviour_mut().transfer_proof.send_response(response_channel, ()).is_err() {
tracing::warn!("Failed to send acknowledgment to Alice that we have received the transfer proof"); tracing::warn!("Failed to send acknowledgment to Alice that we have received the transfer proof");
} else { } else {
tracing::info!("Sent acknowledgment to Alice that we have received the transfer proof"); tracing::info!("Sent acknowledgment to Alice that we have received the transfer proof");
self.pending_transfer_proof = OptionFuture::from(None);
} }
}, },
Some(((swap_id, peer_id, sender), responder)) = self.queued_swap_handlers.next().fuse() => {
tracing::trace!(%swap_id, %peer_id, "Registering swap handle for a swap internally inside the event loop");
// This registers the swap_id -> peer_id and swap_id -> transfer_proof_sender
self.registered_swap_handlers.insert(swap_id, (peer_id, sender));
// Instruct the swarm to contineously redial the peer
// TODO: We must remove it again once the swap is complete, otherwise we will redial indefinitely
self.swarm.behaviour_mut().redial.add_peer(peer_id);
// Acknowledge the registration
let _ = responder.respond(());
},
} }
} }
} }
fn is_connected_to_alice(&self) -> bool {
self.swarm.is_connected(&self.alice_peer_id)
}
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
/// When a NewSwap object is sent into this channel, the EventLoop will: /// When a (PeerId, NewSwap) tuple is sent into this channel, the EventLoop will:
/// 1. Trigger the swap setup protocol with Alice to negotiate the swap parameters /// 1. Trigger the swap setup protocol with the specified peer to negotiate the swap parameters
/// 2. Return the resulting State2 if successful /// 2. Return the resulting State2 if successful
/// 3. Return an anyhow error if the request fails /// 3. Return an anyhow error if the request fails
execution_setup_sender: bmrng::RequestSender<NewSwap, Result<State2>>, execution_setup_sender:
bmrng::unbounded::UnboundedRequestSender<(PeerId, NewSwap), Result<State2>>,
/// Receiver for incoming Monero transfer proofs from Alice. /// When a (PeerId, Uuid, EncryptedSignature) tuple is sent into this channel, the EventLoop will:
/// When a proof is received, we process it and acknowledge receipt back to the EventLoop /// 1. Send the encrypted signature to the specified peer over the network
/// The EventLoop will then send an acknowledgment back to Alice over the network /// 2. Return Ok(()) if the peer acknowledges receipt, or
transfer_proof_receiver: bmrng::RequestReceiver<monero::TransferProof, ()>,
/// When an encrypted signature is sent into this channel, the EventLoop will:
/// 1. Send the encrypted signature to Alice over the network
/// 2. Return Ok(()) if Alice acknowledges receipt, or
/// 3. Return an OutboundFailure error if the request fails /// 3. Return an OutboundFailure error if the request fails
encrypted_signature_sender: encrypted_signature_sender: bmrng::unbounded::UnboundedRequestSender<
bmrng::RequestSender<EncryptedSignature, Result<(), OutboundFailure>>, (PeerId, Uuid, EncryptedSignature),
Result<(), OutboundFailure>,
>,
/// When a () is sent into this channel, the EventLoop will: /// When a PeerId is sent into this channel, the EventLoop will:
/// 1. Request a price quote from Alice /// 1. Request a price quote from the specified peer
/// 2. Return the quote if successful /// 2. Return the quote if successful
/// 3. Return an OutboundFailure error if the request fails /// 3. Return an OutboundFailure error if the request fails
quote_sender: bmrng::RequestSender<(), Result<BidQuote, OutboundFailure>>, quote_sender:
bmrng::unbounded::UnboundedRequestSender<PeerId, Result<BidQuote, OutboundFailure>>,
/// When a () is sent into this channel, the EventLoop will: /// When a (PeerId, Uuid) tuple is sent into this channel, the EventLoop will:
/// 1. Request Alice's cooperation in redeeming the Monero /// 1. Request the specified peer's cooperation in redeeming the Monero for the given swap
/// 2. Return the a response object (Fullfilled or Rejected), if the network request is successful /// 2. Return a response object (Fullfilled or Rejected), if the network request is successful
/// The Fullfilled object contains the keys required to redeem the Monero /// The Fullfilled object contains the keys required to redeem the Monero
/// 3. Return an OutboundFailure error if the network request fails /// 3. Return an OutboundFailure error if the network request fails
cooperative_xmr_redeem_sender: bmrng::RequestSender< cooperative_xmr_redeem_sender: bmrng::unbounded::UnboundedRequestSender<
(), (PeerId, Uuid),
Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>, Result<cooperative_xmr_redeem_after_punish::Response, OutboundFailure>,
>, >,
queued_transfer_proof_sender: bmrng::unbounded::UnboundedRequestSender<
(
Uuid,
PeerId,
bmrng::unbounded::UnboundedRequestSender<monero::TransferProof, ()>,
),
(),
>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -401,14 +484,44 @@ impl EventLoopHandle {
.build() .build()
} }
pub async fn setup_swap(&mut self, swap: NewSwap) -> Result<State2> { /// Creates a SwapEventLoopHandle for a specific swap
tracing::debug!(swap = ?swap, "Sending swap setup request"); /// This registers the swap's transfer proof receiver with the event loop
pub async fn swap_handle(
&mut self,
peer_id: PeerId,
swap_id: Uuid,
) -> Result<SwapEventLoopHandle> {
// Create a channel for sending transfer proofs from the `EventLoop` to the `SwapEventLoopHandle`
//
// The sender is stored in the `EventLoop`. The receiver is stored in the `SwapEventLoopHandle`.
let (transfer_proof_sender, transfer_proof_receiver) = bmrng::unbounded_channel();
// Register this sender in the `EventLoop`
// It is put into the queue and then later moved into `registered_transfer_proof_senders`
//
// We use `send(...) instead of send_receive(...)` because the event loop needs to be running for this to respond
self.queued_transfer_proof_sender
.send((swap_id, peer_id, transfer_proof_sender))
.context("Failed to register transfer proof sender with event loop")?;
Ok(SwapEventLoopHandle {
handle: self.clone(),
peer_id,
swap_id,
transfer_proof_receiver: Some(transfer_proof_receiver),
})
}
pub async fn setup_swap(&mut self, peer_id: PeerId, swap: NewSwap) -> Result<State2> {
tracing::debug!(swap = ?swap, %peer_id, "Sending swap setup request");
let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT); let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT);
backoff::future::retry_notify(backoff, || async { backoff::future::retry_notify(backoff, || async {
match self.execution_setup_sender.send_receive(swap.clone()).await { match self.execution_setup_sender.send_receive((peer_id, swap.clone())).await {
Ok(Ok(state2)) => Ok(state2), Ok(Ok(state2)) => {
Ok(state2)
}
// These are errors thrown by the swap_setup/bob behaviour // These are errors thrown by the swap_setup/bob behaviour
Ok(Err(err)) => { Ok(Err(err)) => {
Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap"))) Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap")))
@ -428,33 +541,19 @@ impl EventLoopHandle {
error = ?err, error = ?err,
"Failed to setup swap. We will retry in {} seconds", "Failed to setup swap. We will retry in {} seconds",
wait_time.as_secs() wait_time.as_secs()
) );
}) })
.await .await
.context("Failed to setup swap after retries") .context("Failed to setup swap after retries")
} }
pub async fn recv_transfer_proof(&mut self) -> Result<monero::TransferProof> { pub async fn request_quote(&mut self, peer_id: PeerId) -> Result<BidQuote> {
let (transfer_proof, responder) = self tracing::debug!(%peer_id, "Requesting quote");
.transfer_proof_receiver
.recv()
.await
.context("Failed to receive transfer proof")?;
responder
.respond(())
.context("Failed to acknowledge receipt of transfer proof")?;
Ok(transfer_proof)
}
pub async fn request_quote(&mut self) -> Result<BidQuote> {
tracing::debug!("Requesting quote");
let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);
backoff::future::retry_notify(backoff, || async { backoff::future::retry_notify(backoff, || async {
match self.quote_sender.send_receive(()).await { match self.quote_sender.send_receive(peer_id).await {
Ok(Ok(quote)) => Ok(quote), Ok(Ok(quote)) => Ok(quote),
Ok(Err(err)) => { Ok(Err(err)) => {
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote"))) Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote")))
@ -474,13 +573,17 @@ impl EventLoopHandle {
.context("Failed to request quote after retries") .context("Failed to request quote after retries")
} }
pub async fn request_cooperative_xmr_redeem(&mut self) -> Result<Response> { pub async fn request_cooperative_xmr_redeem(
tracing::debug!("Requesting cooperative XMR redeem"); &mut self,
peer_id: PeerId,
swap_id: Uuid,
) -> Result<Response> {
tracing::debug!(%peer_id, %swap_id, "Requesting cooperative XMR redeem");
let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT);
backoff::future::retry_notify(backoff, || async { backoff::future::retry_notify(backoff, || async {
match self.cooperative_xmr_redeem_sender.send_receive(()).await { match self.cooperative_xmr_redeem_sender.send_receive((peer_id, swap_id)).await {
Ok(Ok(response)) => Ok(response), Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => { Ok(Err(err)) => {
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem"))) Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem")))
@ -500,8 +603,13 @@ impl EventLoopHandle {
.context("Failed to request cooperative XMR redeem after retries") .context("Failed to request cooperative XMR redeem after retries")
} }
pub async fn send_encrypted_signature(&mut self, tx_redeem_encsig: EncryptedSignature) { pub async fn send_encrypted_signature(
tracing::debug!("Sending encrypted signature"); &mut self,
peer_id: PeerId,
swap_id: Uuid,
tx_redeem_encsig: EncryptedSignature,
) -> () {
tracing::debug!(%peer_id, %swap_id, "Sending encrypted signature");
// We will retry indefinitely until we succeed // We will retry indefinitely until we succeed
let backoff = backoff::ExponentialBackoffBuilder::new() let backoff = backoff::ExponentialBackoffBuilder::new()
@ -510,7 +618,7 @@ impl EventLoopHandle {
.build(); .build();
backoff::future::retry_notify(backoff, || async { backoff::future::retry_notify(backoff, || async {
match self.encrypted_signature_sender.send_receive(tx_redeem_encsig.clone()).await { match self.encrypted_signature_sender.send_receive((peer_id, swap_id, tx_redeem_encsig.clone())).await {
Ok(Ok(_)) => Ok(()), Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => { Ok(Err(err)) => {
Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature"))) Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature")))
@ -530,3 +638,101 @@ impl EventLoopHandle {
.expect("we should never run out of retries when sending an encrypted signature") .expect("we should never run out of retries when sending an encrypted signature")
} }
} }
#[derive(Debug)]
pub struct SwapEventLoopHandle {
handle: EventLoopHandle,
peer_id: PeerId,
swap_id: Uuid,
transfer_proof_receiver:
Option<bmrng::unbounded::UnboundedRequestReceiver<monero::TransferProof, ()>>,
}
impl SwapEventLoopHandle {
pub async fn recv_transfer_proof(&mut self) -> Result<monero::TransferProof> {
let receiver = self
.transfer_proof_receiver
.as_mut()
.context("Transfer proof receiver not available")?;
let (transfer_proof, responder) = receiver
.recv()
.await
.context("Failed to receive transfer proof")?;
responder
.respond(())
.context("Failed to acknowledge receipt of transfer proof")?;
Ok(transfer_proof)
}
pub async fn send_encrypted_signature(&mut self, tx_redeem_encsig: EncryptedSignature) -> () {
self.handle
.send_encrypted_signature(self.peer_id, self.swap_id, tx_redeem_encsig)
.await
}
pub async fn request_cooperative_xmr_redeem(&mut self) -> Result<Response> {
self.handle
.request_cooperative_xmr_redeem(self.peer_id, self.swap_id)
.await
}
pub async fn setup_swap(&mut self, swap: NewSwap) -> Result<State2> {
self.handle.setup_swap(self.peer_id, swap).await
}
pub async fn request_quote(&mut self) -> Result<BidQuote> {
self.handle.request_quote(self.peer_id).await
}
}
/// Returns Ok(true) if we should acknowledge the transfer proof
///
/// - Checks if the peer id is the expected peer id
/// - Checks if the state indicates that we have already processed the transfer proof
async fn should_acknowledge_transfer_proof(
db: Arc<dyn Database + Send + Sync>,
swap_id: Uuid,
peer_id: PeerId,
) -> Result<bool> {
let expected_peer_id = db.get_peer_id(swap_id).await.context(
"Failed to get peer id for swap to check if we should acknowledge the transfer proof",
)?;
// If the peer id is not the expected peer id, we should not acknowledge the transfer proof
// This is to prevent malicious requests
if expected_peer_id != peer_id {
bail!("Expected peer id {} but got {}", expected_peer_id, peer_id);
}
let state = db.get_state(swap_id).await.context(
"Failed to get state for swap to check if we should acknowledge the transfer proof",
)?;
let state: BobState = state.try_into().context(
"Failed to convert state to BobState to check if we should acknowledge the transfer proof",
)?;
Ok(has_already_processed_transfer_proof(&state))
}
/// Buffers the transfer proof in the database if its from the expected peer
async fn buffer_transfer_proof_if_needed(
db: Arc<dyn Database + Send + Sync>,
swap_id: Uuid,
peer_id: PeerId,
transfer_proof: monero::TransferProof,
) -> Result<()> {
let expected_peer_id = db.get_peer_id(swap_id).await.context(
"Failed to get peer id for swap to check if we should buffer the transfer proof",
)?;
if expected_peer_id != peer_id {
bail!("Expected peer id {} but got {}", expected_peer_id, peer_id);
}
db.insert_buffered_transfer_proof(swap_id, transfer_proof)
.await
.context("Failed to buffer transfer proof in database")
}

View file

@ -220,13 +220,16 @@ mod crates {
]; ];
pub const OUR_CRATES: &[&str] = &[ pub const OUR_CRATES: &[&str] = &[
"swap", // Library crates
"swap_p2p", "swap_p2p",
"asb",
"swap_env", "swap_env",
"swap_core",
"swap_fs", "swap_fs",
"swap_serde", "swap_serde",
"monero_sys", "monero_sys",
// Binary crates
"swap",
"asb",
"unstoppableswap_gui_rs", "unstoppableswap_gui_rs",
]; ];

View file

@ -1,11 +1,11 @@
pub use swap_p2p::protocols::cooperative_xmr_redeem_after_punish; pub use swap_p2p::protocols::cooperative_xmr_redeem_after_punish;
pub use swap_p2p::protocols::encrypted_signature; pub use swap_p2p::protocols::encrypted_signature;
pub use swap_p2p::protocols::quote; pub use swap_p2p::protocols::quote;
pub use swap_p2p::protocols::redial;
pub use swap_p2p::protocols::rendezvous; pub use swap_p2p::protocols::rendezvous;
pub use swap_p2p::protocols::swap_setup; pub use swap_p2p::protocols::swap_setup;
pub use swap_p2p::protocols::transfer_proof; pub use swap_p2p::protocols::transfer_proof;
pub mod redial;
pub mod swarm; pub mod swarm;
pub mod transport; pub mod transport;

View file

@ -1,135 +0,0 @@
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use futures::future::FutureExt;
use libp2p::core::Multiaddr;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{NetworkBehaviour, ToSwarm};
use libp2p::PeerId;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Instant, Sleep};
use void::Void;
/// A [`NetworkBehaviour`] that tracks whether we are connected to the given
/// peer and attempts to re-establish a connection with an exponential backoff
/// if we lose the connection.
pub struct Behaviour {
/// The peer we are interested in.
peer: PeerId,
/// If present, tracks for how long we need to sleep until we dial again.
sleep: Option<Pin<Box<Sleep>>>,
/// Tracks the current backoff state.
backoff: ExponentialBackoff,
}
impl Behaviour {
pub fn new(peer: PeerId, interval: Duration, max_interval: Duration) -> Self {
Self {
peer,
sleep: None,
backoff: ExponentialBackoff {
initial_interval: interval,
current_interval: interval,
max_interval,
max_elapsed_time: None, // We never give up on re-dialling
..ExponentialBackoff::default()
},
}
}
pub fn until_next_redial(&self) -> Option<Duration> {
let until_next_redial = self
.sleep
.as_ref()?
.deadline()
.checked_duration_since(Instant::now())?;
Some(until_next_redial)
}
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
type ToSwarm = ();
fn handle_established_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// We establish an inbound connection to the peer we are interested in.
// We stop re-dialling.
// Reset the backoff state to start with the initial interval again once we disconnect again
if peer == self.peer {
self.backoff.reset();
self.sleep = None;
}
Ok(Self::ConnectionHandler {})
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
// We establish an outbound connection to the peer we are interested in.
// We stop re-dialling.
// Reset the backoff state to start with the initial interval again once we disconnect again
if peer == self.peer {
self.backoff.reset();
self.sleep = None;
}
Ok(Self::ConnectionHandler {})
}
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<'_>) {
let redial = match event {
libp2p::swarm::FromSwarm::ConnectionClosed(e) if e.peer_id == self.peer => true,
libp2p::swarm::FromSwarm::DialFailure(e) if e.peer_id == Some(self.peer) => true,
_ => false,
};
if redial && self.sleep.is_none() {
self.sleep = Some(Box::pin(tokio::time::sleep(self.backoff.initial_interval)));
tracing::info!(seconds_until_next_redial = %self.until_next_redial().expect("We initialize the backoff without max_elapsed_time").as_secs(), "Waiting for next redial attempt");
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> std::task::Poll<ToSwarm<Self::ToSwarm, Void>> {
let sleep = match self.sleep.as_mut() {
None => return Poll::Pending, // early exit if we shouldn't be re-dialling
Some(future) => future,
};
futures::ready!(sleep.poll_unpin(cx));
let next_dial_in = match self.backoff.next_backoff() {
Some(next_dial_in) => next_dial_in,
None => {
unreachable!("The backoff should never run out of attempts");
}
};
self.sleep = Some(Box::pin(tokio::time::sleep(next_dial_in)));
Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(self.peer)
.condition(PeerCondition::Disconnected)
.build(),
})
}
fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: libp2p::swarm::ConnectionId,
_event: libp2p::swarm::THandlerOutEvent<Self>,
) {
unreachable!("The re-dial dummy connection handler does not produce any events");
}
}

View file

@ -20,7 +20,7 @@ pub mod swap;
pub struct Swap { pub struct Swap {
pub state: BobState, pub state: BobState,
pub event_loop_handle: cli::EventLoopHandle, pub event_loop_handle: cli::SwapEventLoopHandle,
pub db: Arc<dyn Database + Send + Sync>, pub db: Arc<dyn Database + Send + Sync>,
pub bitcoin_wallet: Arc<dyn BitcoinWallet>, pub bitcoin_wallet: Arc<dyn BitcoinWallet>,
pub monero_wallet: Arc<monero::Wallets>, pub monero_wallet: Arc<monero::Wallets>,
@ -38,7 +38,7 @@ impl Swap {
bitcoin_wallet: Arc<dyn BitcoinWallet>, bitcoin_wallet: Arc<dyn BitcoinWallet>,
monero_wallet: Arc<monero::Wallets>, monero_wallet: Arc<monero::Wallets>,
env_config: env::Config, env_config: env::Config,
event_loop_handle: cli::EventLoopHandle, event_loop_handle: cli::SwapEventLoopHandle,
monero_receive_pool: MoneroAddressPool, monero_receive_pool: MoneroAddressPool,
bitcoin_change_address: bitcoin::Address, bitcoin_change_address: bitcoin::Address,
btc_amount: bitcoin::Amount, btc_amount: bitcoin::Amount,
@ -68,7 +68,7 @@ impl Swap {
bitcoin_wallet: Arc<dyn BitcoinWallet>, bitcoin_wallet: Arc<dyn BitcoinWallet>,
monero_wallet: Arc<monero::Wallets>, monero_wallet: Arc<monero::Wallets>,
env_config: env::Config, env_config: env::Config,
event_loop_handle: cli::EventLoopHandle, event_loop_handle: cli::SwapEventLoopHandle,
monero_receive_pool: MoneroAddressPool, monero_receive_pool: MoneroAddressPool,
) -> Result<Self> { ) -> Result<Self> {
let state = db.get_state(id).await?.try_into()?; let state = db.get_state(id).await?.try_into()?;

View file

@ -1,6 +1,6 @@
use crate::cli::api::tauri_bindings::LockBitcoinDetails; use crate::cli::api::tauri_bindings::LockBitcoinDetails;
use crate::cli::api::tauri_bindings::{TauriEmitter, TauriHandle, TauriSwapProgressEvent}; use crate::cli::api::tauri_bindings::{TauriEmitter, TauriHandle, TauriSwapProgressEvent};
use crate::cli::EventLoopHandle; use crate::cli::SwapEventLoopHandle;
use crate::common::retry; use crate::common::retry;
use crate::monero; use crate::monero;
use crate::monero::MoneroAddressPool; use crate::monero::MoneroAddressPool;
@ -8,7 +8,7 @@ use crate::network::cooperative_xmr_redeem_after_punish::Response::{Fullfilled,
use crate::network::swap_setup::bob::NewSwap; use crate::network::swap_setup::bob::NewSwap;
use crate::protocol::bob::*; use crate::protocol::bob::*;
use crate::protocol::{bob, Database}; use crate::protocol::{bob, Database};
use anyhow::{bail, Context as AnyContext, Result}; use anyhow::{Context as AnyContext, Result};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use swap_core::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund}; use swap_core::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund};
@ -90,7 +90,7 @@ pub async fn run_until(
async fn next_state( async fn next_state(
swap_id: Uuid, swap_id: Uuid,
state: BobState, state: BobState,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut SwapEventLoopHandle,
db: Arc<dyn Database + Send + Sync>, db: Arc<dyn Database + Send + Sync>,
bitcoin_wallet: Arc<dyn BitcoinWallet>, bitcoin_wallet: Arc<dyn BitcoinWallet>,
monero_wallet: Arc<monero::Wallets>, monero_wallet: Arc<monero::Wallets>,

View file

@ -57,7 +57,7 @@ where
let cli = Cli::default(); let cli = Cli::default();
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter("info,swap=debug,monero_harness=debug,monero_rpc=debug,bitcoin_harness=info,testcontainers=info,monero_cpp=info,monero_sys=debug") // add `reqwest::connect::verbose=trace` if you want to logs of the RPC clients .with_env_filter("info,swap=trace,swap_p2p=trace,monero_harness=debug,monero_rpc=debug,bitcoin_harness=info,testcontainers=info,monero_cpp=info,monero_sys=debug") // add `reqwest::connect::verbose=trace` if you want to logs of the RPC clients
.with_test_writer() .with_test_writer()
.init(); .init();
@ -525,7 +525,9 @@ impl BobParams {
} }
let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?); let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?);
let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; let (event_loop, mut handle) = self.new_eventloop(db.clone()).await?;
let swap_handle = handle.swap_handle(self.alice_peer_id, swap_id).await?;
let swap = bob::Swap::from_db( let swap = bob::Swap::from_db(
db.clone(), db.clone(),
@ -533,7 +535,7 @@ impl BobParams {
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
self.monero_wallet.clone(), self.monero_wallet.clone(),
self.env_config, self.env_config,
handle, swap_handle,
self.monero_wallet self.monero_wallet
.main_wallet() .main_wallet()
.await .await
@ -560,17 +562,19 @@ impl BobParams {
} }
let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?); let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?);
let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; let (event_loop, mut handle) = self.new_eventloop(db.clone()).await?;
db.insert_peer_id(swap_id, self.alice_peer_id).await?; db.insert_peer_id(swap_id, self.alice_peer_id).await?;
let swap_handle = handle.swap_handle(self.alice_peer_id, swap_id).await?;
let swap = bob::Swap::new( let swap = bob::Swap::new(
db, db,
swap_id, swap_id,
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
self.monero_wallet.clone(), self.monero_wallet.clone(),
self.env_config, self.env_config,
handle, swap_handle,
self.monero_wallet self.monero_wallet
.main_wallet() .main_wallet()
.await .await
@ -587,13 +591,11 @@ impl BobParams {
pub async fn new_eventloop( pub async fn new_eventloop(
&self, &self,
swap_id: Uuid,
db: Arc<dyn Database + Send + Sync>, db: Arc<dyn Database + Send + Sync>,
) -> Result<(cli::EventLoop, cli::EventLoopHandle)> { ) -> Result<(cli::EventLoop, cli::EventLoopHandle)> {
let identity = self.seed.derive_libp2p_identity(); let identity = self.seed.derive_libp2p_identity();
let behaviour = cli::Behaviour::new( let behaviour = cli::Behaviour::new(
self.alice_peer_id,
self.env_config, self.env_config,
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
(identity.clone(), XmrBtcNamespace::Testnet), (identity.clone(), XmrBtcNamespace::Testnet),
@ -601,7 +603,7 @@ impl BobParams {
let mut swarm = swarm::cli(identity.clone(), None, behaviour).await?; let mut swarm = swarm::cli(identity.clone(), None, behaviour).await?;
swarm.add_peer_address(self.alice_peer_id, self.alice_address.clone()); swarm.add_peer_address(self.alice_peer_id, self.alice_address.clone());
cli::EventLoop::new(swap_id, swarm, self.alice_peer_id, db.clone()) cli::EventLoop::new(swarm, db.clone())
} }
} }