2021-03-03 19:28:58 -05:00
use crate ::bitcoin ::EncryptedSignature ;
2021-03-03 00:54:47 -05:00
use crate ::network ::quote ::BidQuote ;
2021-04-08 04:56:26 -04:00
use crate ::network ::{ encrypted_signature , spot_price } ;
2021-03-03 19:28:58 -05:00
use crate ::protocol ::bob ::{ Behaviour , OutEvent , State0 , State2 } ;
use crate ::{ bitcoin , monero } ;
2021-03-29 18:59:51 -04:00
use anyhow ::{ Context , Result } ;
use futures ::future ::{ BoxFuture , OptionFuture } ;
use futures ::{ FutureExt , StreamExt } ;
use libp2p ::request_response ::{ RequestId , ResponseChannel } ;
2021-03-18 00:54:33 -04:00
use libp2p ::swarm ::SwarmEvent ;
2021-03-23 01:56:04 -04:00
use libp2p ::{ PeerId , Swarm } ;
2021-03-29 18:59:51 -04:00
use std ::collections ::HashMap ;
2021-03-03 19:28:58 -05:00
use std ::sync ::Arc ;
2021-03-29 18:59:51 -04:00
use std ::time ::Duration ;
2021-04-08 04:56:26 -04:00
use uuid ::Uuid ;
2021-01-04 22:08:36 -05:00
2020-12-22 22:33:29 -05:00
#[ allow(missing_debug_implementations) ]
2020-12-09 21:19:18 -05:00
pub struct EventLoop {
2021-04-08 04:56:26 -04:00
swap_id : Uuid ,
2020-12-09 21:46:48 -05:00
swarm : libp2p ::Swarm < Behaviour > ,
2021-02-03 00:45:43 -05:00
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
2020-12-21 21:47:05 -05:00
alice_peer_id : PeerId ,
2021-03-29 18:59:51 -04:00
// these streams represents outgoing requests that we have to make
quote_requests : bmrng ::RequestReceiverStream < ( ) , BidQuote > ,
spot_price_requests : bmrng ::RequestReceiverStream < spot_price ::Request , spot_price ::Response > ,
2021-04-08 04:56:26 -04:00
encrypted_signatures : bmrng ::RequestReceiverStream < EncryptedSignature , ( ) > ,
2021-03-29 18:59:51 -04:00
execution_setup_requests : bmrng ::RequestReceiverStream < State0 , Result < State2 > > ,
// these represents requests that are currently in-flight.
// once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response.
inflight_spot_price_requests : HashMap < RequestId , bmrng ::Responder < spot_price ::Response > > ,
inflight_quote_requests : HashMap < RequestId , bmrng ::Responder < BidQuote > > ,
inflight_encrypted_signature_requests : HashMap < RequestId , bmrng ::Responder < ( ) > > ,
inflight_execution_setup : Option < bmrng ::Responder < Result < State2 > > > ,
/// The sender we will use to relay incoming transfer proofs.
2021-04-08 04:56:26 -04:00
transfer_proof : bmrng ::RequestSender < monero ::TransferProof , ( ) > ,
2021-03-29 18:59:51 -04:00
/// The future representing the successful handling of an incoming transfer
/// proof.
///
/// Once we've sent a transfer proof to the ongoing swap, this future waits
/// until the swap took it "out" of the `EventLoopHandle`. As this future
/// resolves, we use the `ResponseChannel` returned from it to send an ACK
/// to Alice that we have successfully processed the transfer proof.
pending_transfer_proof : OptionFuture < BoxFuture < 'static , ResponseChannel < ( ) > > > ,
2020-12-06 21:31:14 -05:00
}
2020-12-09 21:19:18 -05:00
impl EventLoop {
2020-12-21 21:47:05 -05:00
pub fn new (
2021-04-08 04:56:26 -04:00
swap_id : Uuid ,
2021-03-23 01:56:04 -04:00
swarm : Swarm < Behaviour > ,
2020-12-21 21:47:05 -05:00
alice_peer_id : PeerId ,
2021-02-03 00:45:43 -05:00
bitcoin_wallet : Arc < bitcoin ::Wallet > ,
2020-12-21 21:47:05 -05:00
) -> Result < ( Self , EventLoopHandle ) > {
2021-03-29 18:59:51 -04:00
let execution_setup = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let transfer_proof = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let encrypted_signature = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let spot_price = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
let quote = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 30 ) ) ;
2020-12-08 22:10:24 -05:00
2020-12-21 21:47:05 -05:00
let event_loop = EventLoop {
2021-04-08 04:56:26 -04:00
swap_id ,
2020-12-06 21:31:14 -05:00
swarm ,
2020-12-21 21:47:05 -05:00
alice_peer_id ,
2021-02-03 00:45:43 -05:00
bitcoin_wallet ,
2021-03-29 18:59:51 -04:00
execution_setup_requests : execution_setup . 1. into ( ) ,
transfer_proof : transfer_proof . 0 ,
2021-04-08 04:56:26 -04:00
encrypted_signatures : encrypted_signature . 1. into ( ) ,
2021-03-29 18:59:51 -04:00
spot_price_requests : spot_price . 1. into ( ) ,
quote_requests : quote . 1. into ( ) ,
inflight_spot_price_requests : HashMap ::default ( ) ,
inflight_quote_requests : HashMap ::default ( ) ,
inflight_execution_setup : None ,
inflight_encrypted_signature_requests : HashMap ::default ( ) ,
pending_transfer_proof : OptionFuture ::from ( None ) ,
2020-12-08 22:10:24 -05:00
} ;
2020-12-09 21:19:18 -05:00
let handle = EventLoopHandle {
2021-03-29 18:59:51 -04:00
execution_setup : execution_setup . 0 ,
transfer_proof : transfer_proof . 1 ,
encrypted_signature : encrypted_signature . 0 ,
spot_price : spot_price . 0 ,
quote : quote . 0 ,
2020-12-08 22:10:24 -05:00
} ;
2020-12-21 21:47:05 -05:00
Ok ( ( event_loop , handle ) )
2020-12-06 21:31:14 -05:00
}
2021-03-18 03:00:02 -04:00
pub async fn run ( mut self ) {
let _ = Swarm ::dial ( & mut self . swarm , & self . alice_peer_id ) ;
2020-12-06 21:31:14 -05:00
loop {
2021-03-30 23:21:49 -04:00
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
2020-12-09 02:08:26 -05:00
tokio ::select! {
2021-03-18 00:54:33 -04:00
swarm_event = self . swarm . next_event ( ) . fuse ( ) = > {
2020-12-09 02:08:26 -05:00
match swarm_event {
2021-03-29 18:59:51 -04:00
SwarmEvent ::Behaviour ( OutEvent ::SpotPriceReceived { id , response } ) = > {
if let Some ( responder ) = self . inflight_spot_price_requests . remove ( & id ) {
let _ = responder . respond ( response ) ;
}
2021-03-18 03:00:02 -04:00
}
2021-03-29 18:59:51 -04:00
SwarmEvent ::Behaviour ( OutEvent ::QuoteReceived { id , response } ) = > {
if let Some ( responder ) = self . inflight_quote_requests . remove ( & id ) {
let _ = responder . respond ( response ) ;
}
2021-02-03 00:45:43 -05:00
}
2021-03-29 18:59:51 -04:00
SwarmEvent ::Behaviour ( OutEvent ::ExecutionSetupDone ( response ) ) = > {
if let Some ( responder ) = self . inflight_execution_setup . take ( ) {
let _ = responder . respond ( * response ) ;
2021-02-05 00:30:43 -05:00
}
2020-12-09 02:08:26 -05:00
}
2021-03-29 18:59:51 -04:00
SwarmEvent ::Behaviour ( OutEvent ::TransferProofReceived { msg , channel } ) = > {
2021-04-08 04:56:26 -04:00
if msg . swap_id ! = self . swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing ::warn! ( " Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored. " , msg . swap_id , self . swap_id ) ;
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self . swarm . transfer_proof . send_response ( channel , ( ) ) ;
continue ;
}
let mut responder = match self . transfer_proof . send ( msg . tx_lock_proof ) . await {
2021-03-29 18:59:51 -04:00
Ok ( responder ) = > responder ,
2021-04-08 04:56:26 -04:00
Err ( e ) = > {
tracing ::warn! ( " Failed to pass on transfer proof: {:#} " , e ) ;
2021-03-29 18:59:51 -04:00
continue ;
}
} ;
self . pending_transfer_proof = OptionFuture ::from ( Some ( async move {
let _ = responder . recv ( ) . await ;
channel
} . boxed ( ) ) ) ;
}
SwarmEvent ::Behaviour ( OutEvent ::EncryptedSignatureAcknowledged { id } ) = > {
if let Some ( responder ) = self . inflight_encrypted_signature_requests . remove ( & id ) {
let _ = responder . respond ( ( ) ) ;
}
2021-01-28 23:13:40 -05:00
}
2021-03-18 00:54:33 -04:00
SwarmEvent ::Behaviour ( OutEvent ::ResponseSent ) = > {
}
2021-03-18 03:00:02 -04:00
SwarmEvent ::Behaviour ( OutEvent ::CommunicationError ( error ) ) = > {
tracing ::warn! ( " Communication error: {:#} " , error ) ;
return ;
2021-02-05 00:30:43 -05:00
}
2021-03-18 03:00:02 -04:00
SwarmEvent ::ConnectionEstablished { peer_id , endpoint , .. } if peer_id = = self . alice_peer_id = > {
tracing ::debug! ( " Connected to Alice at {} " , endpoint . get_remote_address ( ) ) ;
}
SwarmEvent ::Dialing ( peer_id ) if peer_id = = self . alice_peer_id = > {
tracing ::debug! ( " Dialling Alice at {} " , peer_id ) ;
2020-12-18 01:39:04 -05:00
}
2021-03-18 03:00:02 -04:00
SwarmEvent ::ConnectionClosed { peer_id , endpoint , num_established , cause } if peer_id = = self . alice_peer_id & & num_established = = 0 = > {
match cause {
Some ( error ) = > {
tracing ::warn! ( " Lost connection to Alice at {}, cause: {} " , endpoint . get_remote_address ( ) , error ) ;
} ,
None = > {
// no error means the disconnection was requested
tracing ::info! ( " Successfully closed connection to Alice " ) ;
return ;
}
}
match libp2p ::Swarm ::dial ( & mut self . swarm , & self . alice_peer_id ) {
Ok ( ( ) ) = > { } ,
Err ( e ) = > {
tracing ::warn! ( " Failed to re-dial Alice: {} " , e ) ;
return ;
}
}
}
SwarmEvent ::UnreachableAddr { peer_id , address , attempts_remaining , error } if peer_id = = self . alice_peer_id & & attempts_remaining = = 0 = > {
tracing ::warn! ( " Failed to dial Alice at {}: {} " , address , error ) ;
}
_ = > { }
2020-12-09 02:08:26 -05:00
}
} ,
2021-03-30 23:21:49 -04:00
// Handle to-be-sent requests for all our network protocols.
// Use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected.
Some ( ( request , responder ) ) = self . spot_price_requests . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
2021-03-29 18:59:51 -04:00
let id = self . swarm . spot_price . send_request ( & self . alice_peer_id , request ) ;
self . inflight_spot_price_requests . insert ( id , responder ) ;
2020-12-09 02:08:26 -05:00
} ,
2021-03-30 23:21:49 -04:00
Some ( ( ( ) , responder ) ) = self . quote_requests . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
2021-03-29 18:59:51 -04:00
let id = self . swarm . quote . send_request ( & self . alice_peer_id , ( ) ) ;
self . inflight_quote_requests . insert ( id , responder ) ;
2021-03-03 00:54:47 -05:00
} ,
2021-03-30 23:21:49 -04:00
Some ( ( request , responder ) ) = self . execution_setup_requests . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
2021-03-29 18:59:51 -04:00
self . swarm . execution_setup . run ( self . alice_peer_id , request , self . bitcoin_wallet . clone ( ) ) ;
self . inflight_execution_setup = Some ( responder ) ;
2021-02-03 00:45:43 -05:00
} ,
2021-04-08 04:56:26 -04:00
Some ( ( tx_redeem_encsig , responder ) ) = self . encrypted_signatures . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
let request = encrypted_signature ::Request {
swap_id : self . swap_id ,
tx_redeem_encsig
} ;
2021-03-29 18:59:51 -04:00
let id = self . swarm . encrypted_signature . send_request ( & self . alice_peer_id , request ) ;
self . inflight_encrypted_signature_requests . insert ( id , responder ) ;
} ,
2021-03-30 23:21:49 -04:00
2021-03-29 18:59:51 -04:00
Some ( response_channel ) = & mut self . pending_transfer_proof = > {
let _ = self . swarm . transfer_proof . send_response ( response_channel , ( ) ) ;
self . pending_transfer_proof = OptionFuture ::from ( None ) ;
2020-12-09 02:08:26 -05:00
}
2020-12-08 22:10:24 -05:00
}
}
2020-12-06 21:31:14 -05:00
}
2021-03-30 23:21:49 -04:00
fn is_connected_to_alice ( & self ) -> bool {
Swarm ::is_connected ( & self . swarm , & self . alice_peer_id )
}
2020-12-06 21:31:14 -05:00
}
2021-03-23 02:26:13 -04:00
#[ derive(Debug) ]
pub struct EventLoopHandle {
2021-03-29 18:59:51 -04:00
execution_setup : bmrng ::RequestSender < State0 , Result < State2 > > ,
2021-04-08 04:56:26 -04:00
transfer_proof : bmrng ::RequestReceiver < monero ::TransferProof , ( ) > ,
encrypted_signature : bmrng ::RequestSender < EncryptedSignature , ( ) > ,
2021-03-29 18:59:51 -04:00
spot_price : bmrng ::RequestSender < spot_price ::Request , spot_price ::Response > ,
quote : bmrng ::RequestSender < ( ) , BidQuote > ,
2021-03-23 02:26:13 -04:00
}
impl EventLoopHandle {
pub async fn execution_setup ( & mut self , state0 : State0 ) -> Result < State2 > {
2021-03-29 18:59:51 -04:00
self . execution_setup . send_receive ( state0 ) . await ?
2021-03-23 02:26:13 -04:00
}
2021-04-08 04:56:26 -04:00
pub async fn recv_transfer_proof ( & mut self ) -> Result < monero ::TransferProof > {
let ( transfer_proof , responder ) = self
2021-03-29 18:59:51 -04:00
. transfer_proof
2021-03-23 02:26:13 -04:00
. recv ( )
. await
2021-03-29 18:59:51 -04:00
. context ( " Failed to receive transfer proof " ) ? ;
responder
. respond ( ( ) )
. context ( " Failed to acknowledge receipt of transfer proof " ) ? ;
2021-04-08 04:56:26 -04:00
Ok ( transfer_proof )
2021-03-23 02:26:13 -04:00
}
pub async fn request_spot_price ( & mut self , btc : bitcoin ::Amount ) -> Result < monero ::Amount > {
2021-03-29 18:59:51 -04:00
Ok ( self
. spot_price
. send_receive ( spot_price ::Request { btc } )
. await ?
. xmr )
2021-03-23 02:26:13 -04:00
}
pub async fn request_quote ( & mut self ) -> Result < BidQuote > {
2021-03-29 18:59:51 -04:00
Ok ( self . quote . send_receive ( ( ) ) . await ? )
2021-03-23 02:26:13 -04:00
}
pub async fn send_encrypted_signature (
& mut self ,
tx_redeem_encsig : EncryptedSignature ,
) -> Result < ( ) > {
2021-03-29 18:59:51 -04:00
Ok ( self
. encrypted_signature
2021-04-08 04:56:26 -04:00
. send_receive ( tx_redeem_encsig )
2021-03-29 18:59:51 -04:00
. await ? )
2021-03-23 02:26:13 -04:00
}
}