From cc8b8551177499cf8c34812c0a39258c9d00687c Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 8 Feb 2021 17:03:16 +1100 Subject: [PATCH] Make it possible to clone a handle This will be used for new swaps. --- swap/src/protocol/alice/event_loop.rs | 78 +++++++++++++++++++-------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index b9bc6998..42d717d2 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -9,34 +9,53 @@ use crate::{ bob::{EncryptedSignature, SwapRequest}, }, }; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use libp2p::{ core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, }; use tokio::{ - sync::mpsc::{Receiver, Sender}, + sync::{broadcast, mpsc}, time::timeout, }; use tracing::{debug, error, trace}; #[allow(missing_debug_implementations)] -pub struct Channels { - sender: Sender, - receiver: Receiver, +pub struct MpscChannels { + sender: mpsc::Sender, + receiver: mpsc::Receiver, } -impl Default for Channels { +impl Default for MpscChannels { fn default() -> Self { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - Channels { sender, receiver } + let (sender, receiver) = mpsc::channel(100); + MpscChannels { sender, receiver } + } +} + +#[allow(missing_debug_implementations)] +pub struct BroadcastChannels +where + T: Clone, +{ + sender: broadcast::Sender, + receiver: broadcast::Receiver, +} + +impl Default for BroadcastChannels +where + T: Clone, +{ + fn default() -> Self { + let (sender, receiver) = broadcast::channel(100); + BroadcastChannels { sender, receiver } } } #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: Receiver, - send_transfer_proof: Sender<(PeerId, TransferProof)>, - recv_transfer_proof_ack: Receiver<()>, + recv_encrypted_signature: broadcast::Receiver, + send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, + recv_transfer_proof_ack: broadcast::Receiver<()>, } impl EventLoopHandle { @@ -44,9 +63,8 @@ impl EventLoopHandle { self.recv_encrypted_signature .recv() .await - .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) + .context("Failed to receive Bitcoin encrypted signature from Bob") } - pub async fn send_transfer_proof( &mut self, bob: PeerId, @@ -75,9 +93,12 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - recv_encrypted_signature: Sender, - send_transfer_proof: Receiver<(PeerId, TransferProof)>, - recv_transfer_proof_ack: Sender<()>, + recv_encrypted_signature: broadcast::Sender, + send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, + recv_transfer_proof_ack: broadcast::Sender<()>, + + // Only used to clone further handles + handle: EventLoopHandle, } impl EventLoop { @@ -98,15 +119,22 @@ impl EventLoop { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - let recv_encrypted_signature = Channels::default(); - let send_transfer_proof = Channels::default(); - let recv_transfer_proof_ack = Channels::default(); + let recv_encrypted_signature = BroadcastChannels::default(); + let send_transfer_proof = MpscChannels::default(); + let recv_transfer_proof_ack = BroadcastChannels::default(); + + let handle_clone = EventLoopHandle { + recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(), + send_transfer_proof: send_transfer_proof.sender.clone(), + recv_transfer_proof_ack: recv_transfer_proof_ack.sender.subscribe(), + }; let driver = EventLoop { swarm, recv_encrypted_signature: recv_encrypted_signature.sender, send_transfer_proof: send_transfer_proof.receiver, recv_transfer_proof_ack: recv_transfer_proof_ack.sender, + handle: handle_clone, }; let handle = EventLoopHandle { @@ -118,6 +146,14 @@ impl EventLoop { Ok((driver, handle)) } + pub fn clone_handle(&self) -> EventLoopHandle { + EventLoopHandle { + recv_encrypted_signature: self.recv_encrypted_signature.subscribe(), + send_transfer_proof: self.handle.send_transfer_proof.clone(), + recv_transfer_proof_ack: self.recv_transfer_proof_ack.subscribe(), + } + } + pub async fn run(&mut self) { loop { tokio::select! { @@ -134,10 +170,10 @@ impl EventLoop { } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); - let _ = self.recv_transfer_proof_ack.send(()).await; + let _ = self.recv_transfer_proof_ack.send(()); } OutEvent::EncryptedSignature{ msg, channel } => { - let _ = self.recv_encrypted_signature.send(*msg).await; + let _ = self.recv_encrypted_signature.send(*msg); // Send back empty response so that the request/response protocol completes. if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { error!("Failed to send Encrypted Signature ack: {:?}", error);