Make it possible to clone a handle

This will be used for new swaps.
This commit is contained in:
Franck Royer 2021-02-08 17:03:16 +11:00
parent 1b2be804ed
commit cc8b855117
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4

View File

@ -9,34 +9,53 @@ use crate::{
bob::{EncryptedSignature, SwapRequest}, bob::{EncryptedSignature, SwapRequest},
}, },
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{Context, Result};
use libp2p::{ use libp2p::{
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use tokio::{ use tokio::{
sync::mpsc::{Receiver, Sender}, sync::{broadcast, mpsc},
time::timeout, time::timeout,
}; };
use tracing::{debug, error, trace}; use tracing::{debug, error, trace};
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Channels<T> { pub struct MpscChannels<T> {
sender: Sender<T>, sender: mpsc::Sender<T>,
receiver: Receiver<T>, receiver: mpsc::Receiver<T>,
} }
impl<T> Default for Channels<T> { impl<T> Default for MpscChannels<T> {
fn default() -> Self { fn default() -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
Channels { sender, receiver } MpscChannels { sender, receiver }
}
}
#[allow(missing_debug_implementations)]
pub struct BroadcastChannels<T>
where
T: Clone,
{
sender: broadcast::Sender<T>,
receiver: broadcast::Receiver<T>,
}
impl<T> Default for BroadcastChannels<T>
where
T: Clone,
{
fn default() -> Self {
let (sender, receiver) = broadcast::channel(100);
BroadcastChannels { sender, receiver }
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: Receiver<EncryptedSignature>, recv_encrypted_signature: broadcast::Receiver<EncryptedSignature>,
send_transfer_proof: Sender<(PeerId, TransferProof)>, send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Receiver<()>, recv_transfer_proof_ack: broadcast::Receiver<()>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -44,9 +63,8 @@ impl EventLoopHandle {
self.recv_encrypted_signature self.recv_encrypted_signature
.recv() .recv()
.await .await
.ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) .context("Failed to receive Bitcoin encrypted signature from Bob")
} }
pub async fn send_transfer_proof( pub async fn send_transfer_proof(
&mut self, &mut self,
bob: PeerId, bob: PeerId,
@ -75,9 +93,12 @@ impl EventLoopHandle {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
recv_encrypted_signature: Sender<EncryptedSignature>, recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
send_transfer_proof: Receiver<(PeerId, TransferProof)>, send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Sender<()>, recv_transfer_proof_ack: broadcast::Sender<()>,
// Only used to clone further handles
handle: EventLoopHandle,
} }
impl EventLoop { impl EventLoop {
@ -98,15 +119,22 @@ impl EventLoop {
Swarm::listen_on(&mut swarm, listen.clone()) Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Address is not supported: {:#}", listen))?; .with_context(|| format!("Address is not supported: {:#}", listen))?;
let recv_encrypted_signature = Channels::default(); let recv_encrypted_signature = BroadcastChannels::default();
let send_transfer_proof = Channels::default(); let send_transfer_proof = MpscChannels::default();
let recv_transfer_proof_ack = Channels::default(); let recv_transfer_proof_ack = BroadcastChannels::default();
let handle_clone = EventLoopHandle {
recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(),
send_transfer_proof: send_transfer_proof.sender.clone(),
recv_transfer_proof_ack: recv_transfer_proof_ack.sender.subscribe(),
};
let driver = EventLoop { let driver = EventLoop {
swarm, swarm,
recv_encrypted_signature: recv_encrypted_signature.sender, recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof: send_transfer_proof.receiver,
recv_transfer_proof_ack: recv_transfer_proof_ack.sender, recv_transfer_proof_ack: recv_transfer_proof_ack.sender,
handle: handle_clone,
}; };
let handle = EventLoopHandle { let handle = EventLoopHandle {
@ -118,6 +146,14 @@ impl EventLoop {
Ok((driver, handle)) Ok((driver, handle))
} }
pub fn clone_handle(&self) -> EventLoopHandle {
EventLoopHandle {
recv_encrypted_signature: self.recv_encrypted_signature.subscribe(),
send_transfer_proof: self.handle.send_transfer_proof.clone(),
recv_transfer_proof_ack: self.recv_transfer_proof_ack.subscribe(),
}
}
pub async fn run(&mut self) { pub async fn run(&mut self) {
loop { loop {
tokio::select! { tokio::select! {
@ -134,10 +170,10 @@ impl EventLoop {
} }
OutEvent::TransferProofAcknowledged => { OutEvent::TransferProofAcknowledged => {
trace!("Bob acknowledged transfer proof"); trace!("Bob acknowledged transfer proof");
let _ = self.recv_transfer_proof_ack.send(()).await; let _ = self.recv_transfer_proof_ack.send(());
} }
OutEvent::EncryptedSignature{ msg, channel } => { OutEvent::EncryptedSignature{ msg, channel } => {
let _ = self.recv_encrypted_signature.send(*msg).await; let _ = self.recv_encrypted_signature.send(*msg);
// Send back empty response so that the request/response protocol completes. // Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) {
error!("Failed to send Encrypted Signature ack: {:?}", error); error!("Failed to send Encrypted Signature ack: {:?}", error);