2021-03-04 11:28:58 +11:00
use crate ::bitcoin ::EncryptedSignature ;
2021-06-25 13:40:33 +10:00
use crate ::cli ::behaviour ::{ Behaviour , OutEvent } ;
2021-06-24 13:54:26 +10:00
use crate ::network ::encrypted_signature ;
2021-03-03 16:54:47 +11:00
use crate ::network ::quote ::BidQuote ;
2021-06-25 13:40:33 +10:00
use crate ::network ::swap_setup ::bob ::NewSwap ;
use crate ::protocol ::bob ::State2 ;
2021-06-24 13:54:26 +10:00
use crate ::{ env , monero } ;
use anyhow ::{ Context , Result } ;
2021-03-30 09:59:51 +11:00
use futures ::future ::{ BoxFuture , OptionFuture } ;
use futures ::{ FutureExt , StreamExt } ;
use libp2p ::request_response ::{ RequestId , ResponseChannel } ;
2021-03-18 15:54:33 +11:00
use libp2p ::swarm ::SwarmEvent ;
2021-03-23 16:56:04 +11:00
use libp2p ::{ PeerId , Swarm } ;
2021-03-30 09:59:51 +11:00
use std ::collections ::HashMap ;
use std ::time ::Duration ;
2021-04-08 18:56:26 +10:00
use uuid ::Uuid ;
2021-01-05 14:08:36 +11:00
2020-12-23 14:33:29 +11:00
#[ allow(missing_debug_implementations) ]
2020-12-10 13:19:18 +11:00
pub struct EventLoop {
2021-04-08 18:56:26 +10:00
swap_id : Uuid ,
2020-12-10 13:46:48 +11:00
swarm : libp2p ::Swarm < Behaviour > ,
2020-12-22 13:47:05 +11:00
alice_peer_id : PeerId ,
2021-03-30 09:59:51 +11:00
// these streams represents outgoing requests that we have to make
quote_requests : bmrng ::RequestReceiverStream < ( ) , BidQuote > ,
2021-04-08 18:56:26 +10:00
encrypted_signatures : bmrng ::RequestReceiverStream < EncryptedSignature , ( ) > ,
2021-06-24 13:54:26 +10:00
swap_setup_requests : bmrng ::RequestReceiverStream < NewSwap , Result < State2 > > ,
2021-03-30 09:59:51 +11:00
// these represents requests that are currently in-flight.
// once we get a response to a matching [`RequestId`], we will use the responder to relay the
// response.
inflight_quote_requests : HashMap < RequestId , bmrng ::Responder < BidQuote > > ,
inflight_encrypted_signature_requests : HashMap < RequestId , bmrng ::Responder < ( ) > > ,
2021-06-24 13:54:26 +10:00
inflight_swap_setup : Option < bmrng ::Responder < Result < State2 > > > ,
2021-03-30 09:59:51 +11:00
/// The sender we will use to relay incoming transfer proofs.
2021-04-08 18:56:26 +10:00
transfer_proof : bmrng ::RequestSender < monero ::TransferProof , ( ) > ,
2021-03-30 09:59:51 +11:00
/// The future representing the successful handling of an incoming transfer
/// proof.
///
/// Once we've sent a transfer proof to the ongoing swap, this future waits
/// until the swap took it "out" of the `EventLoopHandle`. As this future
/// resolves, we use the `ResponseChannel` returned from it to send an ACK
/// to Alice that we have successfully processed the transfer proof.
pending_transfer_proof : OptionFuture < BoxFuture < 'static , ResponseChannel < ( ) > > > ,
2020-12-07 13:31:14 +11:00
}
2020-12-10 13:19:18 +11:00
impl EventLoop {
2020-12-22 13:47:05 +11:00
pub fn new (
2021-04-08 18:56:26 +10:00
swap_id : Uuid ,
2021-03-23 16:56:04 +11:00
swarm : Swarm < Behaviour > ,
2020-12-22 13:47:05 +11:00
alice_peer_id : PeerId ,
2021-05-11 22:22:59 +10:00
env_config : env ::Config ,
2020-12-22 13:47:05 +11:00
) -> Result < ( Self , EventLoopHandle ) > {
2021-06-23 13:02:29 +10:00
let execution_setup = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 60 ) ) ;
let transfer_proof = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 60 ) ) ;
2021-08-31 15:21:44 +10:00
let encrypted_signature = bmrng ::channel ( 1 ) ;
2021-06-23 13:02:29 +10:00
let quote = bmrng ::channel_with_timeout ( 1 , Duration ::from_secs ( 60 ) ) ;
2020-12-09 14:10:24 +11:00
2020-12-22 13:47:05 +11:00
let event_loop = EventLoop {
2021-04-08 18:56:26 +10:00
swap_id ,
2020-12-07 13:31:14 +11:00
swarm ,
2020-12-22 13:47:05 +11:00
alice_peer_id ,
2021-06-24 13:54:26 +10:00
swap_setup_requests : execution_setup . 1. into ( ) ,
2021-03-30 09:59:51 +11:00
transfer_proof : transfer_proof . 0 ,
2021-04-08 18:56:26 +10:00
encrypted_signatures : encrypted_signature . 1. into ( ) ,
2021-03-30 09:59:51 +11:00
quote_requests : quote . 1. into ( ) ,
inflight_quote_requests : HashMap ::default ( ) ,
2021-06-24 13:54:26 +10:00
inflight_swap_setup : None ,
2021-03-30 09:59:51 +11:00
inflight_encrypted_signature_requests : HashMap ::default ( ) ,
pending_transfer_proof : OptionFuture ::from ( None ) ,
2020-12-09 14:10:24 +11:00
} ;
2020-12-10 13:19:18 +11:00
let handle = EventLoopHandle {
2021-06-24 13:54:26 +10:00
swap_setup : execution_setup . 0 ,
2021-03-30 09:59:51 +11:00
transfer_proof : transfer_proof . 1 ,
encrypted_signature : encrypted_signature . 0 ,
quote : quote . 0 ,
2021-05-11 22:22:59 +10:00
env_config ,
2020-12-09 14:10:24 +11:00
} ;
2020-12-22 13:47:05 +11:00
Ok ( ( event_loop , handle ) )
2020-12-07 13:31:14 +11:00
}
2021-03-18 18:00:02 +11:00
pub async fn run ( mut self ) {
2021-04-15 07:44:21 +00:00
match self . swarm . dial ( & self . alice_peer_id ) {
2021-04-09 11:50:46 +10:00
Ok ( ( ) ) = > { }
Err ( e ) = > {
tracing ::error! ( " Failed to initiate dial to Alice: {} " , e ) ;
return ;
}
}
2021-03-18 18:00:02 +11:00
2020-12-07 13:31:14 +11:00
loop {
2021-03-31 14:21:49 +11:00
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
2020-12-09 18:08:26 +11:00
tokio ::select! {
2021-06-28 16:45:40 +10:00
swarm_event = self . swarm . select_next_some ( ) = > {
2020-12-09 18:08:26 +11:00
match swarm_event {
2021-03-30 09:59:51 +11:00
SwarmEvent ::Behaviour ( OutEvent ::QuoteReceived { id , response } ) = > {
if let Some ( responder ) = self . inflight_quote_requests . remove ( & id ) {
let _ = responder . respond ( response ) ;
}
2021-02-03 16:45:43 +11:00
}
2021-06-24 13:54:26 +10:00
SwarmEvent ::Behaviour ( OutEvent ::SwapSetupCompleted ( response ) ) = > {
if let Some ( responder ) = self . inflight_swap_setup . take ( ) {
2021-03-30 09:59:51 +11:00
let _ = responder . respond ( * response ) ;
2021-02-05 16:30:43 +11:00
}
2020-12-09 18:08:26 +11:00
}
2021-04-26 22:07:49 +10:00
SwarmEvent ::Behaviour ( OutEvent ::TransferProofReceived { msg , channel , peer } ) = > {
let swap_id = msg . swap_id ;
if peer ! = self . alice_peer_id {
tracing ::warn! (
% swap_id ,
" Ignoring malicious transfer proof from {}, expected to receive it from {} " ,
peer ,
self . alice_peer_id ) ;
continue ;
}
if swap_id ! = self . swap_id {
2021-04-08 18:56:26 +10:00
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
2021-05-05 13:49:11 +10:00
tracing ::warn! ( " Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored " , swap_id , self . swap_id ) ;
2021-04-08 18:56:26 +10:00
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
2021-04-15 07:44:21 +00:00
let _ = self . swarm . behaviour_mut ( ) . transfer_proof . send_response ( channel , ( ) ) ;
2021-04-08 18:56:26 +10:00
continue ;
}
let mut responder = match self . transfer_proof . send ( msg . tx_lock_proof ) . await {
2021-03-30 09:59:51 +11:00
Ok ( responder ) = > responder ,
2021-04-08 18:56:26 +10:00
Err ( e ) = > {
tracing ::warn! ( " Failed to pass on transfer proof: {:#} " , e ) ;
2021-03-30 09:59:51 +11:00
continue ;
}
} ;
self . pending_transfer_proof = OptionFuture ::from ( Some ( async move {
let _ = responder . recv ( ) . await ;
channel
} . boxed ( ) ) ) ;
}
SwarmEvent ::Behaviour ( OutEvent ::EncryptedSignatureAcknowledged { id } ) = > {
if let Some ( responder ) = self . inflight_encrypted_signature_requests . remove ( & id ) {
let _ = responder . respond ( ( ) ) ;
}
2021-01-29 15:13:40 +11:00
}
2021-04-09 11:50:46 +10:00
SwarmEvent ::Behaviour ( OutEvent ::AllRedialAttemptsExhausted { peer } ) if peer = = self . alice_peer_id = > {
tracing ::error! ( " Exhausted all re-dial attempts to Alice " ) ;
return ;
}
2021-03-26 16:40:48 +11:00
SwarmEvent ::Behaviour ( OutEvent ::Failure { peer , error } ) = > {
tracing ::warn! ( % peer , " Communication error: {:#} " , error ) ;
2021-03-18 18:00:02 +11:00
return ;
2021-02-05 16:30:43 +11:00
}
2021-03-18 18:00:02 +11:00
SwarmEvent ::ConnectionEstablished { peer_id , endpoint , .. } if peer_id = = self . alice_peer_id = > {
2021-04-09 11:50:46 +10:00
tracing ::info! ( " Connected to Alice at {} " , endpoint . get_remote_address ( ) ) ;
2021-03-18 18:00:02 +11:00
}
SwarmEvent ::Dialing ( peer_id ) if peer_id = = self . alice_peer_id = > {
tracing ::debug! ( " Dialling Alice at {} " , peer_id ) ;
2020-12-18 17:39:04 +11:00
}
2021-06-21 11:38:14 +10:00
SwarmEvent ::ConnectionClosed { peer_id , endpoint , num_established , cause : Some ( error ) } if peer_id = = self . alice_peer_id & & num_established = = 0 = > {
tracing ::warn! ( " Lost connection to Alice at {}, cause: {} " , endpoint . get_remote_address ( ) , error ) ;
}
SwarmEvent ::ConnectionClosed { peer_id , num_established , cause : None , .. } if peer_id = = self . alice_peer_id & & num_established = = 0 = > {
// no error means the disconnection was requested
tracing ::info! ( " Successfully closed connection to Alice " ) ;
return ;
2021-03-18 18:00:02 +11:00
}
SwarmEvent ::UnreachableAddr { peer_id , address , attempts_remaining , error } if peer_id = = self . alice_peer_id & & attempts_remaining = = 0 = > {
2021-04-09 11:50:46 +10:00
tracing ::warn! ( % address , " Failed to dial Alice: {} " , error ) ;
2021-04-15 07:44:21 +00:00
if let Some ( duration ) = self . swarm . behaviour_mut ( ) . redial . until_next_redial ( ) {
2021-04-09 11:50:46 +10:00
tracing ::info! ( " Next redial attempt in {}s " , duration . as_secs ( ) ) ;
}
2021-03-18 18:00:02 +11:00
}
_ = > { }
2020-12-09 18:08:26 +11:00
}
} ,
2021-03-31 14:21:49 +11:00
// Handle to-be-sent requests for all our network protocols.
// Use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected.
Some ( ( ( ) , responder ) ) = self . quote_requests . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
2021-04-15 07:44:21 +00:00
let id = self . swarm . behaviour_mut ( ) . quote . send_request ( & self . alice_peer_id , ( ) ) ;
2021-03-30 09:59:51 +11:00
self . inflight_quote_requests . insert ( id , responder ) ;
2021-03-03 16:54:47 +11:00
} ,
2021-06-24 13:54:26 +10:00
Some ( ( swap , responder ) ) = self . swap_setup_requests . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
self . swarm . behaviour_mut ( ) . swap_setup . start ( self . alice_peer_id , swap ) . await ;
self . inflight_swap_setup = Some ( responder ) ;
2021-02-03 16:45:43 +11:00
} ,
2021-04-08 18:56:26 +10:00
Some ( ( tx_redeem_encsig , responder ) ) = self . encrypted_signatures . next ( ) . fuse ( ) , if self . is_connected_to_alice ( ) = > {
let request = encrypted_signature ::Request {
swap_id : self . swap_id ,
tx_redeem_encsig
} ;
2021-04-15 07:44:21 +00:00
let id = self . swarm . behaviour_mut ( ) . encrypted_signature . send_request ( & self . alice_peer_id , request ) ;
2021-03-30 09:59:51 +11:00
self . inflight_encrypted_signature_requests . insert ( id , responder ) ;
} ,
2021-03-31 14:21:49 +11:00
2021-03-30 09:59:51 +11:00
Some ( response_channel ) = & mut self . pending_transfer_proof = > {
2021-04-15 07:44:21 +00:00
let _ = self . swarm . behaviour_mut ( ) . transfer_proof . send_response ( response_channel , ( ) ) ;
2021-03-30 09:59:51 +11:00
self . pending_transfer_proof = OptionFuture ::from ( None ) ;
2020-12-09 18:08:26 +11:00
}
2020-12-09 14:10:24 +11:00
}
}
2020-12-07 13:31:14 +11:00
}
2021-03-31 14:21:49 +11:00
fn is_connected_to_alice ( & self ) -> bool {
2021-04-15 07:44:21 +00:00
self . swarm . is_connected ( & self . alice_peer_id )
2021-03-31 14:21:49 +11:00
}
2020-12-07 13:31:14 +11:00
}
2021-03-23 17:26:13 +11:00
#[ derive(Debug) ]
pub struct EventLoopHandle {
2021-06-24 13:54:26 +10:00
swap_setup : bmrng ::RequestSender < NewSwap , Result < State2 > > ,
2021-04-08 18:56:26 +10:00
transfer_proof : bmrng ::RequestReceiver < monero ::TransferProof , ( ) > ,
encrypted_signature : bmrng ::RequestSender < EncryptedSignature , ( ) > ,
2021-03-30 09:59:51 +11:00
quote : bmrng ::RequestSender < ( ) , BidQuote > ,
2021-05-11 22:22:59 +10:00
env_config : env ::Config ,
2021-03-23 17:26:13 +11:00
}
impl EventLoopHandle {
2021-06-24 13:54:26 +10:00
pub async fn setup_swap ( & mut self , swap : NewSwap ) -> Result < State2 > {
self . swap_setup . send_receive ( swap ) . await ?
2021-03-23 17:26:13 +11:00
}
2021-04-08 18:56:26 +10:00
pub async fn recv_transfer_proof ( & mut self ) -> Result < monero ::TransferProof > {
let ( transfer_proof , responder ) = self
2021-03-30 09:59:51 +11:00
. transfer_proof
2021-03-23 17:26:13 +11:00
. recv ( )
. await
2021-03-30 09:59:51 +11:00
. context ( " Failed to receive transfer proof " ) ? ;
responder
. respond ( ( ) )
. context ( " Failed to acknowledge receipt of transfer proof " ) ? ;
2021-04-08 18:56:26 +10:00
Ok ( transfer_proof )
2021-03-23 17:26:13 +11:00
}
pub async fn request_quote ( & mut self ) -> Result < BidQuote > {
2021-03-30 09:59:51 +11:00
Ok ( self . quote . send_receive ( ( ) ) . await ? )
2021-03-23 17:26:13 +11:00
}
pub async fn send_encrypted_signature (
& mut self ,
tx_redeem_encsig : EncryptedSignature ,
2021-08-31 15:21:44 +10:00
) -> Result < ( ) , bmrng ::error ::RequestError < EncryptedSignature > > {
2021-03-30 09:59:51 +11:00
Ok ( self
. encrypted_signature
2021-04-08 18:56:26 +10:00
. send_receive ( tx_redeem_encsig )
2021-03-30 09:59:51 +11:00
. await ? )
2021-03-23 17:26:13 +11:00
}
}