Refine peer tracker and amounts protocol

We model the getting of amounts as a network behaviour even though conceptually
it is a protocol. Refine/refactor the code a bit to make this more obvious.

- Use `Amounts` instead of `Messenger`

We only ever connect to a single peer, update peer tracker to reflect this. This
is a single patch because the handling of the two network behaviours is a
intertwined.

- Only track one peer connection
- Track the peer id and the multiaddr of the counterparty
- Emit an event for connection established on Alice's side as well as Bob's side
This commit is contained in:
Tobin C. Harding 2020-10-20 10:10:28 +11:00
parent 47eaa44f76
commit aaf1363c05
5 changed files with 93 additions and 148 deletions

View File

@ -9,9 +9,9 @@ use libp2p::{
use std::time::Duration; use std::time::Duration;
use tracing::debug; use tracing::debug;
mod messenger; mod amounts;
use self::messenger::*; use self::amounts::*;
use crate::{ use crate::{
bitcoin, monero, bitcoin, monero,
network::{ network::{
@ -29,12 +29,14 @@ pub async fn swap(listen: Multiaddr) -> Result<()> {
loop { loop {
match swarm.next().await { match swarm.next().await {
BehaviourOutEvent::Request(messenger::BehaviourOutEvent::Btc { btc, channel }) => { OutEvent::ConnectionEstablished(id) => {
tracing::info!("Connection established with: {}", id);
}
OutEvent::Request(amounts::OutEvent::Btc { btc, channel }) => {
debug!("Got request from Bob to swap {}", btc); debug!("Got request from Bob to swap {}", btc);
let p = calculate_amounts(btc); let p = calculate_amounts(btc);
swarm.send(channel, AliceToBob::Amounts(p)); swarm.send(channel, AliceToBob::Amounts(p));
} }
other => panic!("unexpected event: {:?}", other),
} }
} }
} }
@ -65,22 +67,22 @@ fn new_swarm(listen: Multiaddr) -> Result<Swarm> {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug)]
pub enum BehaviourOutEvent { pub enum OutEvent {
Request(messenger::BehaviourOutEvent), Request(amounts::OutEvent),
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
} }
impl From<messenger::BehaviourOutEvent> for BehaviourOutEvent { impl From<amounts::OutEvent> for OutEvent {
fn from(event: messenger::BehaviourOutEvent) -> Self { fn from(event: amounts::OutEvent) -> Self {
BehaviourOutEvent::Request(event) OutEvent::Request(event)
} }
} }
impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
fn from(event: peer_tracker::BehaviourOutEvent) -> Self { fn from(event: peer_tracker::OutEvent) -> Self {
match event { match event {
peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { peer_tracker::OutEvent::ConnectionEstablished(id) => {
BehaviourOutEvent::ConnectionEstablished(id) OutEvent::ConnectionEstablished(id)
} }
} }
} }
@ -88,10 +90,10 @@ impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent {
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Alice { pub struct Alice {
net: Messenger, amounts: Amounts,
pt: PeerTracker, pt: PeerTracker,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
@ -108,7 +110,7 @@ impl Alice {
/// Alice always sends her messages as a response to a request from Bob. /// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: AliceToBob) { pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: AliceToBob) {
self.net.send(channel, msg); self.amounts.send(channel, msg);
} }
} }
@ -118,7 +120,7 @@ impl Default for Alice {
let timeout = Duration::from_secs(TIMEOUT); let timeout = Duration::from_secs(TIMEOUT);
Self { Self {
net: Messenger::new(timeout), amounts: Amounts::new(timeout),
pt: PeerTracker::default(), pt: PeerTracker::default(),
identity, identity,
} }

View File

@ -17,11 +17,10 @@ use tracing::{debug, error};
use crate::{ use crate::{
bitcoin, bitcoin,
network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, network::request_response::{AliceToBob, BobToAlice, Codec, Protocol},
Never,
}; };
#[derive(Debug)] #[derive(Debug)]
pub enum BehaviourOutEvent { pub enum OutEvent {
Btc { Btc {
btc: bitcoin::Amount, btc: bitcoin::Amount,
channel: ResponseChannel<AliceToBob>, channel: ResponseChannel<AliceToBob>,
@ -30,15 +29,15 @@ pub enum BehaviourOutEvent {
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Messenger { pub struct Amounts {
rr: RequestResponse<Codec>, rr: RequestResponse<Codec>,
#[behaviour(ignore)] #[behaviour(ignore)]
events: VecDeque<BehaviourOutEvent>, events: VecDeque<OutEvent>,
} }
impl Messenger { impl Amounts {
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default(); let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout); config.set_request_timeout(timeout);
@ -74,7 +73,7 @@ impl Messenger {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, BehaviourOutEvent>> { ) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, OutEvent>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
} }
@ -83,7 +82,7 @@ impl Messenger {
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Messenger { impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Amounts {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) { fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
@ -95,9 +94,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
channel, channel,
}, },
} => match request { } => match request {
BobToAlice::AmountsFromBtc(btc) => self BobToAlice::AmountsFromBtc(btc) => {
.events self.events.push_back(OutEvent::Btc { btc, channel })
.push_back(BehaviourOutEvent::Btc { btc, channel }), }
_ => panic!("unexpected request"), _ => panic!("unexpected request"),
}, },
RequestResponseEvent::Message { RequestResponseEvent::Message {
@ -125,11 +124,3 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} }
} }
} }
impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger {
fn inject_event(&mut self, _event: ()) {}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<Never> for Messenger {
fn inject_event(&mut self, _: Never) {}
}

View File

@ -9,9 +9,9 @@ use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use std::{process, thread, time::Duration}; use std::{process, thread, time::Duration};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
mod messenger; mod amounts;
use self::messenger::*; use self::amounts::*;
use crate::{ use crate::{
bitcoin, bitcoin,
network::{ network::{
@ -32,7 +32,7 @@ pub async fn swap(
libp2p::Swarm::dial_addr(&mut swarm, addr)?; libp2p::Swarm::dial_addr(&mut swarm, addr)?;
let id = match swarm.next().await { let id = match swarm.next().await {
BehaviourOutEvent::ConnectionEstablished(id) => id, OutEvent::ConnectionEstablished(id) => id,
other => panic!("unexpected event: {:?}", other), other => panic!("unexpected event: {:?}", other),
}; };
info!("Connection established."); info!("Connection established.");
@ -40,7 +40,7 @@ pub async fn swap(
swarm.request_amounts(id, btc).await; swarm.request_amounts(id, btc).await;
match swarm.next().await { match swarm.next().await {
BehaviourOutEvent::Response(messenger::BehaviourOutEvent::Amounts(p)) => { OutEvent::Response(amounts::OutEvent::Amounts(p)) => {
debug!("Got response from Alice: {:?}", p); debug!("Got response from Alice: {:?}", p);
let cmd = Cmd::VerifyAmounts(p); let cmd = Cmd::VerifyAmounts(p);
cmd_tx.try_send(cmd)?; cmd_tx.try_send(cmd)?;
@ -82,22 +82,22 @@ fn new_swarm() -> Result<Swarm> {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug)]
pub enum BehaviourOutEvent { pub enum OutEvent {
Response(messenger::BehaviourOutEvent), Response(amounts::OutEvent),
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
} }
impl From<messenger::BehaviourOutEvent> for BehaviourOutEvent { impl From<amounts::OutEvent> for OutEvent {
fn from(event: messenger::BehaviourOutEvent) -> Self { fn from(event: amounts::OutEvent) -> Self {
BehaviourOutEvent::Response(event) OutEvent::Response(event)
} }
} }
impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
fn from(event: peer_tracker::BehaviourOutEvent) -> Self { fn from(event: peer_tracker::OutEvent) -> Self {
match event { match event {
peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => { peer_tracker::OutEvent::ConnectionEstablished(id) => {
BehaviourOutEvent::ConnectionEstablished(id) OutEvent::ConnectionEstablished(id)
} }
} }
} }
@ -105,10 +105,10 @@ impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent {
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Bob { pub struct Bob {
net: Messenger, amounts: Amounts,
pt: PeerTracker, pt: PeerTracker,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
@ -126,13 +126,13 @@ impl Bob {
/// Sends a message to Alice to get current amounts based on `btc`. /// Sends a message to Alice to get current amounts based on `btc`.
pub async fn request_amounts(&mut self, alice: PeerId, btc: u64) { pub async fn request_amounts(&mut self, alice: PeerId, btc: u64) {
let btc = bitcoin::Amount::from_sat(btc); let btc = bitcoin::Amount::from_sat(btc);
let _id = self.net.request_amounts(alice.clone(), btc).await; let _id = self.amounts.request_amounts(alice.clone(), btc).await;
debug!("Requesting amounts from: {}", alice); debug!("Requesting amounts from: {}", alice);
} }
/// Returns Alice's peer id if we are connected. /// Returns Alice's peer id if we are connected.
pub fn peer_id_of_alice(&self) -> Option<PeerId> { pub fn peer_id_of_alice(&self) -> Option<PeerId> {
self.pt.counterparty() self.pt.counterparty_peer_id()
} }
} }
@ -142,7 +142,7 @@ impl Default for Bob {
let timeout = Duration::from_secs(TIMEOUT); let timeout = Duration::from_secs(TIMEOUT);
Self { Self {
net: Messenger::new(timeout), amounts: Amounts::new(timeout),
pt: PeerTracker::default(), pt: PeerTracker::default(),
identity, identity,
} }

View File

@ -17,25 +17,25 @@ use tracing::error;
use crate::{ use crate::{
bitcoin, bitcoin,
network::request_response::{AliceToBob, BobToAlice, Codec, Protocol}, network::request_response::{AliceToBob, BobToAlice, Codec, Protocol},
Never, SwapParams, SwapParams,
}; };
#[derive(Debug)] #[derive(Debug)]
pub enum BehaviourOutEvent { pub enum OutEvent {
Amounts(SwapParams), Amounts(SwapParams),
} }
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Messenger { pub struct Amounts {
rr: RequestResponse<Codec>, rr: RequestResponse<Codec>,
#[behaviour(ignore)] #[behaviour(ignore)]
events: VecDeque<BehaviourOutEvent>, events: VecDeque<OutEvent>,
} }
impl Messenger { impl Amounts {
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default(); let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout); config.set_request_timeout(timeout);
@ -65,7 +65,7 @@ impl Messenger {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, BehaviourOutEvent>> { ) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, OutEvent>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
} }
@ -74,7 +74,7 @@ impl Messenger {
} }
} }
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Messenger { impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Amounts {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) { fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event { match event {
RequestResponseEvent::Message { RequestResponseEvent::Message {
@ -89,7 +89,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
request_id: _, request_id: _,
}, },
} => match response { } => match response {
AliceToBob::Amounts(p) => self.events.push_back(BehaviourOutEvent::Amounts(p)), AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)),
}, },
RequestResponseEvent::InboundFailure { .. } => { RequestResponseEvent::InboundFailure { .. } => {
@ -105,11 +105,3 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
} }
} }
} }
impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger {
fn inject_event(&mut self, _event: ()) {}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<Never> for Messenger {
fn inject_event(&mut self, _: Never) {}
}

View File

@ -7,85 +7,56 @@ use libp2p::{
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use std::{ use std::{collections::VecDeque, task::Poll};
collections::{hash_map::Entry, HashMap, VecDeque},
task::Poll,
};
#[derive(Debug)] #[derive(Debug)]
pub enum BehaviourOutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
} }
/// A NetworkBehaviour that tracks connections to other peers. /// A NetworkBehaviour that tracks connections to the counterparty. Although the
/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple
/// peers we only ever connect to a single counterparty. Peer Tracker tracks
/// that connection.
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct PeerTracker { pub struct PeerTracker {
connected_peers: HashMap<PeerId, Vec<Multiaddr>>, connected: Option<(PeerId, Multiaddr)>,
address_hints: HashMap<PeerId, VecDeque<Multiaddr>>, events: VecDeque<OutEvent>,
events: VecDeque<BehaviourOutEvent>,
} }
impl PeerTracker { impl PeerTracker {
/// Returns an arbitrary connected counterparty. /// Returns an arbitrary connected counterparty.
/// This is useful if we are connected to a single other node. /// This is useful if we are connected to a single other node.
pub fn counterparty(&self) -> Option<PeerId> { pub fn counterparty_peer_id(&self) -> Option<PeerId> {
// TODO: Refactor to use combinators. if let Some((id, _)) = &self.connected {
if let Some((id, _)) = self.connected_peers().next() { return Some(id.clone());
return Some(id);
} }
None None
} }
pub fn connected_peers(&self) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> { /// Returns an arbitrary connected counterparty.
self.connected_peers.clone().into_iter() /// This is useful if we are connected to a single other node.
} pub fn counterparty_addr(&self) -> Option<Multiaddr> {
if let Some((_, addr)) = &self.connected {
/// Adds an address hint for the given peer id. The added address is return Some(addr.clone());
/// considered most recent and hence is added at the start of the list
/// because libp2p tries to connect with the first address first.
pub fn add_recent_address_hint(&mut self, id: PeerId, addr: Multiaddr) {
let old_addresses = self.address_hints.get_mut(&id);
match old_addresses {
None => {
let mut hints = VecDeque::new();
hints.push_back(addr);
self.address_hints.insert(id, hints);
}
Some(hints) => {
hints.push_front(addr);
}
} }
None
} }
} }
impl NetworkBehaviour for PeerTracker { impl NetworkBehaviour for PeerTracker {
type ProtocolsHandler = DummyProtocolsHandler; type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = BehaviourOutEvent; type OutEvent = OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default() DummyProtocolsHandler::default()
} }
/// Note (from libp2p doc): fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
/// The addresses will be tried in the order returned by this function,
/// which means that they should be ordered by decreasing likelihood of
/// reachability. In other words, the first address should be the most
/// likely to be reachable.
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let mut addresses: Vec<Multiaddr> = vec![]; let mut addresses: Vec<Multiaddr> = vec![];
// If we are connected then this address is most likely to be valid if let Some(addr) = self.counterparty_addr() {
if let Some(connected) = self.connected_peers.get(peer) { addresses.push(addr)
for addr in connected.iter() {
addresses.push(addr.clone())
}
}
if let Some(hints) = self.address_hints.get(peer) {
for hint in hints {
addresses.push(hint.clone());
}
} }
addresses addresses
@ -101,35 +72,24 @@ impl NetworkBehaviour for PeerTracker {
_: &ConnectionId, _: &ConnectionId,
point: &ConnectedPoint, point: &ConnectedPoint,
) { ) {
if let ConnectedPoint::Dialer { address } = point { match point {
self.connected_peers ConnectedPoint::Dialer { address } => {
.entry(peer.clone()) self.connected = Some((peer.clone(), address.clone()));
.or_default() }
.push(address.clone()); ConnectedPoint::Listener {
local_addr: _,
send_back_addr,
} => {
self.connected = Some((peer.clone(), send_back_addr.clone()));
}
}
self.events self.events
.push_back(BehaviourOutEvent::ConnectionEstablished(peer.clone())); .push_back(OutEvent::ConnectionEstablished(peer.clone()));
}
} }
fn inject_connection_closed( fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {
&mut self, self.connected = None;
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
if let ConnectedPoint::Dialer { address } = point {
match self.connected_peers.entry(peer.clone()) {
Entry::Vacant(_) => {}
Entry::Occupied(mut entry) => {
let addresses = entry.get_mut();
if let Some(pos) = addresses.iter().position(|a| a == address) {
addresses.remove(pos);
}
}
}
}
} }
fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {} fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {}