From aaf1363c05fa26119956a06b17bc71440b048488 Mon Sep 17 00:00:00 2001 From: "Tobin C. Harding" Date: Tue, 20 Oct 2020 10:10:28 +1100 Subject: [PATCH] Refine peer tracker and amounts protocol We model the getting of amounts as a network behaviour even though conceptually it is a protocol. Refine/refactor the code a bit to make this more obvious. - Use `Amounts` instead of `Messenger` We only ever connect to a single peer, update peer tracker to reflect this. This is a single patch because the handling of the two network behaviours is a intertwined. - Only track one peer connection - Track the peer id and the multiaddr of the counterparty - Emit an event for connection established on Alice's side as well as Bob's side --- swap/src/alice.rs | 36 ++++--- swap/src/alice/{messenger.rs => amounts.rs} | 29 ++--- swap/src/bob.rs | 36 +++---- swap/src/bob/{messenger.rs => amounts.rs} | 26 ++--- swap/src/network/peer_tracker.rs | 114 +++++++------------- 5 files changed, 93 insertions(+), 148 deletions(-) rename swap/src/alice/{messenger.rs => amounts.rs} (82%) rename swap/src/bob/{messenger.rs => amounts.rs} (83%) diff --git a/swap/src/alice.rs b/swap/src/alice.rs index b61c6e58..fde9daa3 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -9,9 +9,9 @@ use libp2p::{ use std::time::Duration; use tracing::debug; -mod messenger; +mod amounts; -use self::messenger::*; +use self::amounts::*; use crate::{ bitcoin, monero, network::{ @@ -29,12 +29,14 @@ pub async fn swap(listen: Multiaddr) -> Result<()> { loop { match swarm.next().await { - BehaviourOutEvent::Request(messenger::BehaviourOutEvent::Btc { btc, channel }) => { + OutEvent::ConnectionEstablished(id) => { + tracing::info!("Connection established with: {}", id); + } + OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => { debug!("Got request from Bob to swap {}", btc); let p = calculate_amounts(btc); swarm.send(channel, AliceToBob::Amounts(p)); } - other => panic!("unexpected event: {:?}", other), } } } @@ -65,22 +67,22 @@ fn new_swarm(listen: Multiaddr) -> Result { #[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum BehaviourOutEvent { - Request(messenger::BehaviourOutEvent), +pub enum OutEvent { + Request(amounts::OutEvent), ConnectionEstablished(PeerId), } -impl From for BehaviourOutEvent { - fn from(event: messenger::BehaviourOutEvent) -> Self { - BehaviourOutEvent::Request(event) +impl From for OutEvent { + fn from(event: amounts::OutEvent) -> Self { + OutEvent::Request(event) } } -impl From for BehaviourOutEvent { - fn from(event: peer_tracker::BehaviourOutEvent) -> Self { +impl From for OutEvent { + fn from(event: peer_tracker::OutEvent) -> Self { match event { - peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { - BehaviourOutEvent::ConnectionEstablished(id) + peer_tracker::OutEvent::ConnectionEstablished(id) => { + OutEvent::ConnectionEstablished(id) } } } @@ -88,10 +90,10 @@ impl From for BehaviourOutEvent { /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Alice { - net: Messenger, + amounts: Amounts, pt: PeerTracker, #[behaviour(ignore)] identity: Keypair, @@ -108,7 +110,7 @@ impl Alice { /// Alice always sends her messages as a response to a request from Bob. pub fn send(&mut self, channel: ResponseChannel, msg: AliceToBob) { - self.net.send(channel, msg); + self.amounts.send(channel, msg); } } @@ -118,7 +120,7 @@ impl Default for Alice { let timeout = Duration::from_secs(TIMEOUT); Self { - net: Messenger::new(timeout), + amounts: Amounts::new(timeout), pt: PeerTracker::default(), identity, } diff --git a/swap/src/alice/messenger.rs b/swap/src/alice/amounts.rs similarity index 82% rename from swap/src/alice/messenger.rs rename to swap/src/alice/amounts.rs index 62355006..dbe100b7 100644 --- a/swap/src/alice/messenger.rs +++ b/swap/src/alice/amounts.rs @@ -17,11 +17,10 @@ use tracing::{debug, error}; use crate::{ bitcoin, network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, - Never, }; #[derive(Debug)] -pub enum BehaviourOutEvent { +pub enum OutEvent { Btc { btc: bitcoin::Amount, channel: ResponseChannel, @@ -30,15 +29,15 @@ pub enum BehaviourOutEvent { /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] -pub struct Messenger { +pub struct Amounts { rr: RequestResponse, #[behaviour(ignore)] - events: VecDeque, + events: VecDeque, } -impl Messenger { +impl Amounts { pub fn new(timeout: Duration) -> Self { let mut config = RequestResponseConfig::default(); config.set_request_timeout(timeout); @@ -74,7 +73,7 @@ impl Messenger { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, BehaviourOutEvent>> { + ) -> Poll, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -83,7 +82,7 @@ impl Messenger { } } -impl NetworkBehaviourEventProcess> for Messenger { +impl NetworkBehaviourEventProcess> for Amounts { fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { @@ -95,9 +94,9 @@ impl NetworkBehaviourEventProcess> channel, }, } => match request { - BobToAlice::AmountsFromBtc(btc) => self - .events - .push_back(BehaviourOutEvent::Btc { btc, channel }), + BobToAlice::AmountsFromBtc(btc) => { + self.events.push_back(OutEvent::Btc { btc, channel }) + } _ => panic!("unexpected request"), }, RequestResponseEvent::Message { @@ -125,11 +124,3 @@ impl NetworkBehaviourEventProcess> } } } - -impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger { - fn inject_event(&mut self, _event: ()) {} -} - -impl libp2p::swarm::NetworkBehaviourEventProcess for Messenger { - fn inject_event(&mut self, _: Never) {} -} diff --git a/swap/src/bob.rs b/swap/src/bob.rs index 8bef0b71..da1403bc 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -9,9 +9,9 @@ use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use std::{process, thread, time::Duration}; use tracing::{debug, info, warn}; -mod messenger; +mod amounts; -use self::messenger::*; +use self::amounts::*; use crate::{ bitcoin, network::{ @@ -32,7 +32,7 @@ pub async fn swap( libp2p::Swarm::dial_addr(&mut swarm, addr)?; let id = match swarm.next().await { - BehaviourOutEvent::ConnectionEstablished(id) => id, + OutEvent::ConnectionEstablished(id) => id, other => panic!("unexpected event: {:?}", other), }; info!("Connection established."); @@ -40,7 +40,7 @@ pub async fn swap( swarm.request_amounts(id, btc).await; match swarm.next().await { - BehaviourOutEvent::Response(messenger::BehaviourOutEvent::Amounts(p)) => { + OutEvent::Response(amounts::OutEvent::Amounts(p)) => { debug!("Got response from Alice: {:?}", p); let cmd = Cmd::VerifyAmounts(p); cmd_tx.try_send(cmd)?; @@ -82,22 +82,22 @@ fn new_swarm() -> Result { #[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum BehaviourOutEvent { - Response(messenger::BehaviourOutEvent), +pub enum OutEvent { + Response(amounts::OutEvent), ConnectionEstablished(PeerId), } -impl From for BehaviourOutEvent { - fn from(event: messenger::BehaviourOutEvent) -> Self { - BehaviourOutEvent::Response(event) +impl From for OutEvent { + fn from(event: amounts::OutEvent) -> Self { + OutEvent::Response(event) } } -impl From for BehaviourOutEvent { - fn from(event: peer_tracker::BehaviourOutEvent) -> Self { +impl From for OutEvent { + fn from(event: peer_tracker::OutEvent) -> Self { match event { - peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { - BehaviourOutEvent::ConnectionEstablished(id) + peer_tracker::OutEvent::ConnectionEstablished(id) => { + OutEvent::ConnectionEstablished(id) } } } @@ -105,10 +105,10 @@ impl From for BehaviourOutEvent { /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Bob { - net: Messenger, + amounts: Amounts, pt: PeerTracker, #[behaviour(ignore)] identity: Keypair, @@ -126,13 +126,13 @@ impl Bob { /// Sends a message to Alice to get current amounts based on `btc`. pub async fn request_amounts(&mut self, alice: PeerId, btc: u64) { let btc = bitcoin::Amount::from_sat(btc); - let _id = self.net.request_amounts(alice.clone(), btc).await; + let _id = self.amounts.request_amounts(alice.clone(), btc).await; debug!("Requesting amounts from: {}", alice); } /// Returns Alice's peer id if we are connected. pub fn peer_id_of_alice(&self) -> Option { - self.pt.counterparty() + self.pt.counterparty_peer_id() } } @@ -142,7 +142,7 @@ impl Default for Bob { let timeout = Duration::from_secs(TIMEOUT); Self { - net: Messenger::new(timeout), + amounts: Amounts::new(timeout), pt: PeerTracker::default(), identity, } diff --git a/swap/src/bob/messenger.rs b/swap/src/bob/amounts.rs similarity index 83% rename from swap/src/bob/messenger.rs rename to swap/src/bob/amounts.rs index 6d57804e..527fdb32 100644 --- a/swap/src/bob/messenger.rs +++ b/swap/src/bob/amounts.rs @@ -17,25 +17,25 @@ use tracing::error; use crate::{ bitcoin, network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, - Never, SwapParams, + SwapParams, }; #[derive(Debug)] -pub enum BehaviourOutEvent { +pub enum OutEvent { Amounts(SwapParams), } /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] -pub struct Messenger { +pub struct Amounts { rr: RequestResponse, #[behaviour(ignore)] - events: VecDeque, + events: VecDeque, } -impl Messenger { +impl Amounts { pub fn new(timeout: Duration) -> Self { let mut config = RequestResponseConfig::default(); config.set_request_timeout(timeout); @@ -65,7 +65,7 @@ impl Messenger { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll, BehaviourOutEvent>> { + ) -> Poll, OutEvent>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -74,7 +74,7 @@ impl Messenger { } } -impl NetworkBehaviourEventProcess> for Messenger { +impl NetworkBehaviourEventProcess> for Amounts { fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { @@ -89,7 +89,7 @@ impl NetworkBehaviourEventProcess> request_id: _, }, } => match response { - AliceToBob::Amounts(p) => self.events.push_back(BehaviourOutEvent::Amounts(p)), + AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)), }, RequestResponseEvent::InboundFailure { .. } => { @@ -105,11 +105,3 @@ impl NetworkBehaviourEventProcess> } } } - -impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger { - fn inject_event(&mut self, _event: ()) {} -} - -impl libp2p::swarm::NetworkBehaviourEventProcess for Messenger { - fn inject_event(&mut self, _: Never) {} -} diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs index 7b76563f..ca3a67e5 100644 --- a/swap/src/network/peer_tracker.rs +++ b/swap/src/network/peer_tracker.rs @@ -7,85 +7,56 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, - task::Poll, -}; +use std::{collections::VecDeque, task::Poll}; #[derive(Debug)] -pub enum BehaviourOutEvent { +pub enum OutEvent { ConnectionEstablished(PeerId), } -/// A NetworkBehaviour that tracks connections to other peers. +/// A NetworkBehaviour that tracks connections to the counterparty. Although the +/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple +/// peers we only ever connect to a single counterparty. Peer Tracker tracks +/// that connection. #[derive(Default, Debug)] pub struct PeerTracker { - connected_peers: HashMap>, - address_hints: HashMap>, - events: VecDeque, + connected: Option<(PeerId, Multiaddr)>, + events: VecDeque, } impl PeerTracker { /// Returns an arbitrary connected counterparty. /// This is useful if we are connected to a single other node. - pub fn counterparty(&self) -> Option { - // TODO: Refactor to use combinators. - if let Some((id, _)) = self.connected_peers().next() { - return Some(id); + pub fn counterparty_peer_id(&self) -> Option { + if let Some((id, _)) = &self.connected { + return Some(id.clone()); } None } - pub fn connected_peers(&self) -> impl Iterator)> { - self.connected_peers.clone().into_iter() - } - - /// Adds an address hint for the given peer id. The added address is - /// considered most recent and hence is added at the start of the list - /// because libp2p tries to connect with the first address first. - pub fn add_recent_address_hint(&mut self, id: PeerId, addr: Multiaddr) { - let old_addresses = self.address_hints.get_mut(&id); - - match old_addresses { - None => { - let mut hints = VecDeque::new(); - hints.push_back(addr); - self.address_hints.insert(id, hints); - } - Some(hints) => { - hints.push_front(addr); - } + /// Returns an arbitrary connected counterparty. + /// This is useful if we are connected to a single other node. + pub fn counterparty_addr(&self) -> Option { + if let Some((_, addr)) = &self.connected { + return Some(addr.clone()); } + None } } impl NetworkBehaviour for PeerTracker { type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = BehaviourOutEvent; + type OutEvent = OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { DummyProtocolsHandler::default() } - /// Note (from libp2p doc): - /// The addresses will be tried in the order returned by this function, - /// which means that they should be ordered by decreasing likelihood of - /// reachability. In other words, the first address should be the most - /// likely to be reachable. - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { let mut addresses: Vec = vec![]; - // If we are connected then this address is most likely to be valid - if let Some(connected) = self.connected_peers.get(peer) { - for addr in connected.iter() { - addresses.push(addr.clone()) - } - } - - if let Some(hints) = self.address_hints.get(peer) { - for hint in hints { - addresses.push(hint.clone()); - } + if let Some(addr) = self.counterparty_addr() { + addresses.push(addr) } addresses @@ -101,35 +72,24 @@ impl NetworkBehaviour for PeerTracker { _: &ConnectionId, point: &ConnectedPoint, ) { - if let ConnectedPoint::Dialer { address } = point { - self.connected_peers - .entry(peer.clone()) - .or_default() - .push(address.clone()); - - self.events - .push_back(BehaviourOutEvent::ConnectionEstablished(peer.clone())); - } - } - - fn inject_connection_closed( - &mut self, - peer: &PeerId, - _: &ConnectionId, - point: &ConnectedPoint, - ) { - if let ConnectedPoint::Dialer { address } = point { - match self.connected_peers.entry(peer.clone()) { - Entry::Vacant(_) => {} - Entry::Occupied(mut entry) => { - let addresses = entry.get_mut(); - - if let Some(pos) = addresses.iter().position(|a| a == address) { - addresses.remove(pos); - } - } + match point { + ConnectedPoint::Dialer { address } => { + self.connected = Some((peer.clone(), address.clone())); + } + ConnectedPoint::Listener { + local_addr: _, + send_back_addr, + } => { + self.connected = Some((peer.clone(), send_back_addr.clone())); } } + + self.events + .push_back(OutEvent::ConnectionEstablished(peer.clone())); + } + + fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { + self.connected = None; } fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {}