Add swap/

Add a binary crate `swap` that implements two nodes (Alice and Bob). With this
applied we can start up a node for each role and do:

- Bob: Requests current amounts using BTC is input
- Alice: Responds with amounts
- Bob: (mock) get user input to Ok the amounts

... continue with swap (TODO)
This commit is contained in:
Tobin C. Harding 2020-10-16 09:14:39 +11:00
parent 4723626fc0
commit 05766d3146
14 changed files with 1141 additions and 1 deletions

View File

@ -1,2 +1,2 @@
[workspace] [workspace]
members = ["monero-harness", "xmr-btc"] members = ["monero-harness", "xmr-btc", "swap"]

32
swap/Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "swap"
version = "0.1.0"
authors = ["CoBloX developers <team@coblox.tech>"]
edition = "2018"
description = "XMR/BTC trustless atomic swaps."
[dependencies]
anyhow = "1"
async-trait = "0.1"
atty = "0.2"
bitcoin = "0.25" # TODO: Upgrade other crates in this repo to use this version.
derivative = "2"
futures = { version = "0.3", default-features = false }
libp2p = { version = "0.28", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] }
libp2p-tokio-socks5 = "0.3"
log = { version = "0.4", features = ["serde"] }
monero = "0.9"
rand = "0.7"
serde = { version = "1", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1"
structopt = "0.3"
time = "0.2"
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "sync"] }
tracing = { version = "0.1", features = ["attributes"] }
tracing-core = "0.1"
tracing-futures = { version = "0.2", features = ["std-future", "futures-03"] }
tracing-log = "0.1"
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter"] }
void = "1"
xmr-btc = { path = "../xmr-btc" }

141
swap/src/alice.rs Normal file
View File

@ -0,0 +1,141 @@
//! Run an XMR/BTC swap in the role of Alice.
//! Alice holds XMR and wishes receive BTC.
use anyhow::Result;
use libp2p::{
core::{identity::Keypair, Multiaddr},
request_response::ResponseChannel,
NetworkBehaviour, PeerId,
};
use std::{thread, time::Duration};
use tracing::{debug, warn};
mod messenger;
use self::messenger::*;
use crate::{
monero,
network::{
peer_tracker::{self, PeerTracker},
request_response::{AliceToBob, TIMEOUT},
transport, TokioExecutor,
},
Never, SwapParams,
};
pub type Swarm = libp2p::Swarm<Alice>;
pub async fn swap(listen: Multiaddr) -> Result<()> {
let mut swarm = new_swarm(listen)?;
match swarm.next().await {
BehaviourOutEvent::Request(messenger::BehaviourOutEvent::Btc { btc, channel }) => {
debug!("Got request from Bob");
let params = SwapParams {
btc,
// TODO: Do a real calculation.
xmr: monero::Amount::from_piconero(10),
};
let msg = AliceToBob::Amounts(params);
swarm.send(channel, msg);
}
other => panic!("unexpected event: {:?}", other),
}
warn!("parking thread ...");
thread::park();
Ok(())
}
fn new_swarm(listen: Multiaddr) -> Result<Swarm> {
use anyhow::Context as _;
let behaviour = Alice::default();
let local_key_pair = behaviour.identity();
let local_peer_id = behaviour.peer_id();
let transport = transport::build(local_key_pair)?;
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Address is not supported: {:#}", listen))?;
tracing::info!("Initialized swarm: {}", local_peer_id);
Ok(swarm)
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BehaviourOutEvent {
Request(messenger::BehaviourOutEvent),
ConnectionEstablished(PeerId),
Never, // FIXME: Why do we need this?
}
impl From<Never> for BehaviourOutEvent {
fn from(_: Never) -> Self {
BehaviourOutEvent::Never
}
}
impl From<messenger::BehaviourOutEvent> for BehaviourOutEvent {
fn from(event: messenger::BehaviourOutEvent) -> Self {
BehaviourOutEvent::Request(event)
}
}
impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent {
fn from(event: peer_tracker::BehaviourOutEvent) -> Self {
match event {
peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => {
BehaviourOutEvent::ConnectionEstablished(id)
}
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Alice {
net: Messenger,
pt: PeerTracker,
#[behaviour(ignore)]
identity: Keypair,
}
impl Alice {
pub fn identity(&self) -> Keypair {
self.identity.clone()
}
pub fn peer_id(&self) -> PeerId {
PeerId::from(self.identity.public())
}
/// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: AliceToBob) {
self.net.send(channel, msg);
}
}
impl Default for Alice {
fn default() -> Self {
let identity = Keypair::generate_ed25519();
let timeout = Duration::from_secs(TIMEOUT);
Self {
net: Messenger::new(timeout),
pt: PeerTracker::default(),
identity,
}
}
}

135
swap/src/alice/messenger.rs Normal file
View File

@ -0,0 +1,135 @@
use anyhow::Result;
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
use crate::{
bitcoin,
network::request_response::{AliceToBob, BobToAlice, Codec, Protocol},
Never,
};
#[derive(Debug)]
pub enum BehaviourOutEvent {
Btc {
btc: bitcoin::Amount,
channel: ResponseChannel<AliceToBob>,
},
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Messenger {
rr: RequestResponse<Codec>,
#[behaviour(ignore)]
events: VecDeque<BehaviourOutEvent>,
}
impl Messenger {
pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
/// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: AliceToBob) {
self.rr.send_response(channel, msg);
}
pub async fn request_amounts(
&mut self,
alice: PeerId,
btc: bitcoin::Amount,
) -> Result<RequestId> {
let msg = BobToAlice::AmountsFromBtc(btc);
let id = self.rr.send_request(&alice, msg);
debug!("Request sent to: {}", alice);
Ok(id)
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, BehaviourOutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Messenger {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Request {
request,
request_id: _,
channel,
},
} => match request {
BobToAlice::AmountsFromBtc(btc) => self
.events
.push_back(BehaviourOutEvent::Btc { btc, channel }),
_ => panic!("unexpected request"),
},
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Response {
response: _,
request_id: _,
},
} => panic!("unexpected response"),
RequestResponseEvent::InboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Outbound failure: {:?}", error);
}
}
}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger {
fn inject_event(&mut self, _event: ()) {}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<Never> for Messenger {
fn inject_event(&mut self, _: Never) {}
}

157
swap/src/bob.rs Normal file
View File

@ -0,0 +1,157 @@
//! Run an XMR/BTC swap in the role of Bob.
//! Bob holds BTC and wishes receive XMR.
use anyhow::Result;
use futures::{
channel::mpsc::{Receiver, Sender},
StreamExt,
};
use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use std::{process, thread, time::Duration};
use tracing::{debug, info, warn};
mod messenger;
use self::messenger::*;
use crate::{
bitcoin,
network::{
peer_tracker::{self, PeerTracker},
request_response::TIMEOUT,
transport, TokioExecutor,
},
Cmd, Never, Rsp,
};
pub async fn swap(
btc: u64,
addr: Multiaddr,
mut cmd_tx: Sender<Cmd>,
mut rsp_rx: Receiver<Rsp>,
) -> Result<()> {
let mut swarm = new_swarm()?;
libp2p::Swarm::dial_addr(&mut swarm, addr)?;
let id = match swarm.next().await {
BehaviourOutEvent::ConnectionEstablished(id) => id,
other => panic!("unexpected event: {:?}", other),
};
info!("Connection established.");
swarm.request_amounts(id, btc).await;
match swarm.next().await {
BehaviourOutEvent::Response(messenger::BehaviourOutEvent::Amounts(p)) => {
debug!("Got response from Alice: {:?}", p);
let cmd = Cmd::VerifyAmounts(p);
cmd_tx.try_send(cmd)?;
let response = rsp_rx.next().await;
if response == Some(Rsp::Abort) {
info!("Amounts no good, aborting ...");
process::exit(0);
}
info!("User verified amounts, continuing with swap ...");
}
other => panic!("unexpected event: {:?}", other),
}
warn!("parking thread ...");
thread::park();
Ok(())
}
pub type Swarm = libp2p::Swarm<Bob>;
fn new_swarm() -> Result<Swarm> {
let behaviour = Bob::default();
let local_key_pair = behaviour.identity();
let local_peer_id = behaviour.peer_id();
let transport = transport::build(local_key_pair)?;
let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
info!("Initialized swarm with identity {}", local_peer_id);
Ok(swarm)
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BehaviourOutEvent {
Response(messenger::BehaviourOutEvent),
ConnectionEstablished(PeerId),
Never, // FIXME: Why do we need this?
}
impl From<Never> for BehaviourOutEvent {
fn from(_: Never) -> Self {
BehaviourOutEvent::Never
}
}
impl From<messenger::BehaviourOutEvent> for BehaviourOutEvent {
fn from(event: messenger::BehaviourOutEvent) -> Self {
BehaviourOutEvent::Response(event)
}
}
impl From<peer_tracker::BehaviourOutEvent> for BehaviourOutEvent {
fn from(event: peer_tracker::BehaviourOutEvent) -> Self {
match event {
peer_tracker::BehaviourOutEvent::ConnectionEstablished(id) => {
BehaviourOutEvent::ConnectionEstablished(id)
}
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", event_process = false)]
#[allow(missing_debug_implementations)]
pub struct Bob {
net: Messenger,
pt: PeerTracker,
#[behaviour(ignore)]
identity: Keypair,
}
impl Bob {
pub fn identity(&self) -> Keypair {
self.identity.clone()
}
pub fn peer_id(&self) -> PeerId {
PeerId::from(self.identity.public())
}
/// Sends a message to Alice to get current amounts based on `btc`.
pub async fn request_amounts(&mut self, alice: PeerId, btc: u64) {
let btc = bitcoin::Amount::from_sat(btc);
let _id = self.net.request_amounts(alice.clone(), btc).await;
debug!("Requesting amounts from: {}", alice);
}
/// Returns Alice's peer id if we are connected.
pub fn peer_id_of_alice(&self) -> Option<PeerId> {
self.pt.counterparty()
}
}
impl Default for Bob {
fn default() -> Bob {
let identity = Keypair::generate_ed25519();
let timeout = Duration::from_secs(TIMEOUT);
Self {
net: Messenger::new(timeout),
pt: PeerTracker::default(),
identity,
}
}
}

117
swap/src/bob/messenger.rs Normal file
View File

@ -0,0 +1,117 @@
use anyhow::Result;
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::{debug, error};
use crate::{
bitcoin,
network::request_response::{AliceToBob, BobToAlice, Codec, Protocol},
Never, SwapParams,
};
#[derive(Debug)]
pub enum BehaviourOutEvent {
Amounts(SwapParams),
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Messenger {
rr: RequestResponse<Codec>,
#[behaviour(ignore)]
events: VecDeque<BehaviourOutEvent>,
}
impl Messenger {
pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
pub async fn request_amounts(
&mut self,
alice: PeerId,
btc: bitcoin::Amount,
) -> Result<RequestId> {
debug!("Sending request ...");
let msg = BobToAlice::AmountsFromBtc(btc);
let id = self.rr.send_request(&alice, msg);
debug!("Sent.");
Ok(id)
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, BehaviourOutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Messenger {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer: _,
message: RequestResponseMessage::Request { .. },
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Response {
response,
request_id: _,
},
} => match response {
AliceToBob::Amounts(p) => self.events.push_back(BehaviourOutEvent::Amounts(p)),
},
RequestResponseEvent::InboundFailure { .. } => {
panic!("Bob should never get a request from Alice, so should never get an InboundFailure");
}
RequestResponseEvent::OutboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Outbound failure: {:?}", error);
}
}
}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<()> for Messenger {
fn inject_event(&mut self, _event: ()) {}
}
impl libp2p::swarm::NetworkBehaviourEventProcess<Never> for Messenger {
fn inject_event(&mut self, _: Never) {}
}

14
swap/src/cli.rs Normal file
View File

@ -0,0 +1,14 @@
#[derive(structopt::StructOpt, Debug)]
pub struct Options {
/// Run the swap as Alice.
#[structopt(long = "as-alice")]
pub as_alice: bool,
/// Run the swap as Bob and try to swap this many XMR (in piconero).
#[structopt(long = "picos")]
pub piconeros: Option<u64>,
/// Run the swap as Bob and try to swap this many BTC (in satoshi).
#[structopt(long = "sats")]
pub satoshis: Option<u64>,
}

95
swap/src/lib.rs Normal file
View File

@ -0,0 +1,95 @@
use serde::{Deserialize, Serialize};
pub mod alice;
pub mod bob;
pub mod network;
pub const ONE_BTC: u64 = 100_000_000;
pub type Never = std::convert::Infallible;
/// Commands sent from Bob to the main task.
#[derive(Debug)]
pub enum Cmd {
VerifyAmounts(SwapParams),
}
/// Responses send from the main task back to Bob.
#[derive(Debug, PartialEq)]
pub enum Rsp {
Verified,
Abort,
}
/// XMR/BTC swap parameters.
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct SwapParams {
/// Amount of BTC to swap.
pub btc: bitcoin::Amount,
/// Amount of XMR to swap.
pub xmr: monero::Amount,
}
// FIXME: Amount modules are a quick hack so we can derive serde.
pub mod monero {
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct Amount(u64);
impl Amount {
/// Create an [Amount] with piconero precision and the given number of
/// piconeros.
///
/// A piconero (a.k.a atomic unit) is equal to 1e-12 XMR.
pub fn from_piconero(amount: u64) -> Self {
Amount(amount)
}
pub fn as_piconero(&self) -> u64 {
self.0
}
}
impl From<Amount> for u64 {
fn from(from: Amount) -> u64 {
from.0
}
}
impl fmt::Display for Amount {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} piconeros", self.0)
}
}
}
pub mod bitcoin {
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct Amount(u64);
impl Amount {
/// The zero amount.
pub const ZERO: Amount = Amount(0);
/// Exactly one satoshi.
pub const ONE_SAT: Amount = Amount(1);
/// Exactly one bitcoin.
pub const ONE_BTC: Amount = Amount(100_000_000);
/// Create an [Amount] with satoshi precision and the given number of
/// satoshis.
pub fn from_sat(satoshi: u64) -> Amount {
Amount(satoshi)
}
}
impl fmt::Display for Amount {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} satoshis", self.0)
}
}
}

96
swap/src/main.rs Normal file
View File

@ -0,0 +1,96 @@
#![warn(
unused_extern_crates,
missing_debug_implementations,
missing_copy_implementations,
rust_2018_idioms,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::fallible_impl_from,
clippy::cast_precision_loss,
clippy::cast_possible_wrap,
clippy::dbg_macro
)]
#![forbid(unsafe_code)]
use anyhow::{bail, Result};
use futures::{channel::mpsc, StreamExt};
use libp2p::Multiaddr;
use log::LevelFilter;
use structopt::StructOpt;
use tracing::info;
mod cli;
mod trace;
use cli::Options;
use swap::{alice, bob, Cmd, Rsp, SwapParams};
// TODO: Add root seed file instead of generating new seed each run.
// Alice's address and port until we have a config file.
pub const PORT: u16 = 9876; // Arbitrarily chosen.
pub const ADDR: &str = "127.0.0.1";
#[tokio::main]
async fn main() -> Result<()> {
let opt = Options::from_args();
trace::init_tracing(LevelFilter::Debug)?;
let addr = format!("/ip4/{}/tcp/{}", ADDR, PORT);
let alice_addr: Multiaddr = addr.parse().expect("failed to parse Alice's address");
if opt.as_alice {
info!("running swap node as Alice ...");
if opt.piconeros.is_some() || opt.satoshis.is_some() {
bail!("Alice cannot set the amount to swap via the cli");
}
swap_as_alice(alice_addr).await?;
} else {
info!("running swap node as Bob ...");
match (opt.piconeros, opt.satoshis) {
(Some(_), Some(_)) => bail!("Please supply only a single amount to swap"),
(None, None) => bail!("Please supply an amount to swap"),
(Some(_picos), _) => todo!("support starting with picos"),
(None, Some(sats)) => {
swap_as_bob(sats, alice_addr).await?;
}
};
}
Ok(())
}
async fn swap_as_alice(addr: Multiaddr) -> Result<()> {
alice::swap(addr).await
}
async fn swap_as_bob(sats: u64, addr: Multiaddr) -> Result<()> {
let (cmd_tx, mut cmd_rx) = mpsc::channel(1);
let (mut rsp_tx, rsp_rx) = mpsc::channel(1);
tokio::spawn(bob::swap(sats, addr, cmd_tx, rsp_rx));
loop {
let read = cmd_rx.next().await;
match read {
Some(cmd) => match cmd {
Cmd::VerifyAmounts(p) => {
if verified(p) {
rsp_tx.try_send(Rsp::Verified)?;
}
}
},
None => {
info!("Channel closed from other end");
return Ok(());
}
}
}
}
fn verified(_p: SwapParams) -> bool {
// TODO: Read input from the shell.
true
}

18
swap/src/network.rs Normal file
View File

@ -0,0 +1,18 @@
use futures::prelude::*;
use libp2p::core::Executor;
use std::pin::Pin;
use tokio::runtime::Handle;
pub mod peer_tracker;
pub mod request_response;
pub mod transport;
pub struct TokioExecutor {
pub handle: Handle,
}
impl Executor for TokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = self.handle.spawn(future);
}
}

View File

@ -0,0 +1,148 @@
use futures::task::Context;
use libp2p::{
core::{connection::ConnectionId, ConnectedPoint},
swarm::{
protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
},
Multiaddr, PeerId,
};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
task::Poll,
};
#[derive(Debug)]
pub enum BehaviourOutEvent {
ConnectionEstablished(PeerId),
}
/// A NetworkBehaviour that tracks connections to other peers.
#[derive(Default, Debug)]
pub struct PeerTracker {
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
address_hints: HashMap<PeerId, VecDeque<Multiaddr>>,
events: VecDeque<BehaviourOutEvent>,
}
impl PeerTracker {
/// Returns an arbitrary connected counterparty.
/// This is useful if we are connected to a single other node.
pub fn counterparty(&self) -> Option<PeerId> {
// TODO: Refactor to use combinators.
if let Some((id, _)) = self.connected_peers().next() {
return Some(id);
}
None
}
pub fn connected_peers(&self) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> {
self.connected_peers.clone().into_iter()
}
/// Adds an address hint for the given peer id. The added address is
/// considered most recent and hence is added at the start of the list
/// because libp2p tries to connect with the first address first.
pub fn add_recent_address_hint(&mut self, id: PeerId, addr: Multiaddr) {
let old_addresses = self.address_hints.get_mut(&id);
match old_addresses {
None => {
let mut hints = VecDeque::new();
hints.push_back(addr);
self.address_hints.insert(id, hints);
}
Some(hints) => {
hints.push_front(addr);
}
}
}
}
impl NetworkBehaviour for PeerTracker {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = BehaviourOutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
}
/// Note (from libp2p doc):
/// The addresses will be tried in the order returned by this function,
/// which means that they should be ordered by decreasing likelihood of
/// reachability. In other words, the first address should be the most
/// likely to be reachable.
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let mut addresses: Vec<Multiaddr> = vec![];
// If we are connected then this address is most likely to be valid
if let Some(connected) = self.connected_peers.get(peer) {
for addr in connected.iter() {
addresses.push(addr.clone())
}
}
if let Some(hints) = self.address_hints.get(peer) {
for hint in hints {
addresses.push(hint.clone());
}
}
addresses
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_established(
&mut self,
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
if let ConnectedPoint::Dialer { address } = point {
self.connected_peers
.entry(peer.clone())
.or_default()
.push(address.clone());
self.events
.push_back(BehaviourOutEvent::ConnectionEstablished(peer.clone()));
}
}
fn inject_connection_closed(
&mut self,
peer: &PeerId,
_: &ConnectionId,
point: &ConnectedPoint,
) {
if let ConnectedPoint::Dialer { address } = point {
match self.connected_peers.entry(peer.clone()) {
Entry::Vacant(_) => {}
Entry::Occupied(mut entry) => {
let addresses = entry.get_mut();
if let Some(pos) = addresses.iter().position(|a| a == address) {
addresses.remove(pos);
}
}
}
}
}
fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<void::Void, Self::OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}

View File

@ -0,0 +1,109 @@
use async_trait::async_trait;
use futures::prelude::*;
use libp2p::{
core::upgrade,
request_response::{ProtocolName, RequestResponseCodec},
};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io};
use crate::{bitcoin, monero, SwapParams};
/// Time to wait for a response back once we send a request.
pub const TIMEOUT: u64 = 3600; // One hour.
/// Messages Bob sends to Alice.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BobToAlice {
AmountsFromBtc(bitcoin::Amount),
AmountsFromXmr(monero::Amount),
/* TODO: How are we going to do this when the messages are not Clone?
* Msg(bob::Message), */
}
/// Messages Alice sends to Bob.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AliceToBob {
Amounts(SwapParams),
/* TODO: How are we going to do this when the messages are not Clone?
* Msg(alice::Message) */
}
#[derive(Debug, Clone, Copy, Default)]
pub struct Protocol;
impl ProtocolName for Protocol {
fn protocol_name(&self) -> &[u8] {
b"/xmr/btc/1.0.0"
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Codec;
#[async_trait]
impl RequestResponseCodec for Codec {
type Protocol = Protocol;
type Request = BobToAlice;
type Response = AliceToBob;
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, 1024)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
let msg = BobToAlice::deserialize(&mut de)?;
Ok(msg)
}
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, 1024)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
let msg = AliceToBob::deserialize(&mut de)?;
Ok(msg)
}
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_json::to_vec(&req)?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_json::to_vec(&res)?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
}

View File

@ -0,0 +1,53 @@
use anyhow::Result;
use libp2p::{
core::{
either::EitherError,
identity,
muxing::StreamMuxerBox,
transport::{boxed::Boxed, timeout::TransportTimeoutError},
upgrade::{SelectUpgrade, Version},
Transport, UpgradeError,
},
dns::{DnsConfig, DnsErr},
mplex::MplexConfig,
noise::{self, NoiseConfig, NoiseError, X25519Spec},
tcp::TokioTcpConfig,
yamux, PeerId,
};
use std::{io, time::Duration};
/// Builds a libp2p transport with the following features:
/// - TcpConnection
/// - DNS name resolution
/// - authentication via noise
/// - multiplexing via yamux or mplex
pub fn build(id_keys: identity::Keypair) -> Result<SwapTransport> {
let dh_keys = noise::Keypair::<X25519Spec>::new().into_authentic(&id_keys)?;
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let tcp = TokioTcpConfig::new().nodelay(true);
let dns = DnsConfig::new(tcp)?;
let transport = dns
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
yamux::Config::default(),
MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.boxed();
Ok(transport)
}
pub type SwapTransport = Boxed<
(PeerId, StreamMuxerBox),
TransportTimeoutError<
EitherError<
EitherError<DnsErr<io::Error>, UpgradeError<NoiseError>>,
UpgradeError<EitherError<io::Error, io::Error>>,
>,
>,
>;

25
swap/src/trace.rs Normal file
View File

@ -0,0 +1,25 @@
use atty::{self, Stream};
use log::LevelFilter;
use tracing::{info, subscriber};
use tracing_log::LogTracer;
use tracing_subscriber::FmtSubscriber;
pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> {
if level == LevelFilter::Off {
return Ok(());
}
// Upstream log filter.
LogTracer::init_with_filter(LevelFilter::Debug)?;
let is_terminal = atty::is(Stream::Stdout);
let subscriber = FmtSubscriber::builder()
.with_env_filter(format!("swap={}", level))
.with_ansi(is_terminal)
.finish();
subscriber::set_global_default(subscriber)?;
info!("Initialized tracing with level: {}", level);
Ok(())
}