Add address at the start

Do not carry peer id around as we currently only expect one peer.
This commit is contained in:
Franck Royer 2020-12-22 13:47:05 +11:00
parent 7d3b2faedd
commit 49b84d84b9
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
10 changed files with 99 additions and 180 deletions

View File

@ -142,11 +142,7 @@ async fn main() -> Result<()> {
xmr: receive_monero,
};
let bob_state = BobState::Started {
state0,
amounts,
alice_peer_id: alice_peer_id.clone(),
};
let bob_state = BobState::Started { state0, amounts };
let swap_id = Uuid::new_v4();
info!(
@ -304,7 +300,8 @@ async fn bob_swap(
let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity())?;
let (event_loop, handle) = bob::event_loop::EventLoop::new(bob_transport, bob_behaviour)?;
let (event_loop, handle) =
bob::event_loop::EventLoop::new(bob_transport, bob_behaviour, alice_peer_id, alice_addr)?;
let swap = bob::swap::swap(
state,
@ -314,8 +311,6 @@ async fn bob_swap(
monero_wallet.clone(),
OsRng,
swap_id,
alice_peer_id,
alice_addr,
);
tokio::spawn(async move { event_loop.run().await });

View File

@ -34,14 +34,13 @@ pub struct EventLoopHandle {
msg0: Receiver<alice::Message0>,
msg1: Receiver<alice::Message1>,
msg2: Receiver<alice::Message2>,
request_amounts: Sender<(PeerId, ::bitcoin::Amount)>,
request_amounts: Sender<::bitcoin::Amount>,
conn_established: Receiver<PeerId>,
dial_alice: Sender<PeerId>,
add_address: Sender<(PeerId, Multiaddr)>,
send_msg0: Sender<(PeerId, bob::Message0)>,
send_msg1: Sender<(PeerId, bob::Message1)>,
send_msg2: Sender<(PeerId, bob::Message2)>,
send_msg3: Sender<(PeerId, EncryptedSignature)>,
dial_alice: Sender<()>,
send_msg0: Sender<bob::Message0>,
send_msg1: Sender<bob::Message1>,
send_msg2: Sender<bob::Message2>,
send_msg3: Sender<EncryptedSignature>,
}
impl EventLoopHandle {
@ -68,9 +67,9 @@ impl EventLoopHandle {
/// Dials other party and wait for the connection to be established.
/// Do nothing if we are already connected
pub async fn dial(&mut self, peer_id: PeerId) -> Result<()> {
debug!("Attempt to dial Alice {}", peer_id);
let _ = self.dial_alice.send(peer_id).await?;
pub async fn dial(&mut self) -> Result<()> {
debug!("Attempt to dial Alice");
let _ = self.dial_alice.send(()).await?;
self.conn_established
.recv()
@ -80,92 +79,84 @@ impl EventLoopHandle {
Ok(())
}
pub async fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<()> {
debug!("Attempt to add address {} for peer id {}", addr, peer_id);
self.add_address.send((peer_id, addr)).await?;
pub async fn request_amounts(&mut self, btc_amount: ::bitcoin::Amount) -> Result<()> {
let _ = self.request_amounts.send(btc_amount).await?;
Ok(())
}
pub async fn request_amounts(
&mut self,
peer_id: PeerId,
btc_amount: ::bitcoin::Amount,
) -> Result<()> {
let _ = self.request_amounts.send((peer_id, btc_amount)).await?;
pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> {
let _ = self.send_msg0.send(msg).await?;
Ok(())
}
pub async fn send_message0(&mut self, peer_id: PeerId, msg: bob::Message0) -> Result<()> {
let _ = self.send_msg0.send((peer_id, msg)).await?;
pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> {
let _ = self.send_msg1.send(msg).await?;
Ok(())
}
pub async fn send_message1(&mut self, peer_id: PeerId, msg: bob::Message1) -> Result<()> {
let _ = self.send_msg1.send((peer_id, msg)).await?;
pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> {
let _ = self.send_msg2.send(msg).await?;
Ok(())
}
pub async fn send_message2(&mut self, peer_id: PeerId, msg: bob::Message2) -> Result<()> {
let _ = self.send_msg2.send((peer_id, msg)).await?;
Ok(())
}
pub async fn send_message3(
&mut self,
peer_id: PeerId,
tx_redeem_encsig: EncryptedSignature,
) -> Result<()> {
let _ = self.send_msg3.send((peer_id, tx_redeem_encsig)).await?;
pub async fn send_message3(&mut self, tx_redeem_encsig: EncryptedSignature) -> Result<()> {
let _ = self.send_msg3.send(tx_redeem_encsig).await?;
Ok(())
}
}
pub struct EventLoop {
swarm: libp2p::Swarm<Behaviour>,
alice_peer_id: PeerId,
msg0: Sender<alice::Message0>,
msg1: Sender<alice::Message1>,
msg2: Sender<alice::Message2>,
conn_established: Sender<PeerId>,
request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>,
dial_alice: Receiver<PeerId>,
add_address: Receiver<(PeerId, Multiaddr)>,
send_msg0: Receiver<(PeerId, bob::Message0)>,
send_msg1: Receiver<(PeerId, bob::Message1)>,
send_msg2: Receiver<(PeerId, bob::Message2)>,
send_msg3: Receiver<(PeerId, EncryptedSignature)>,
request_amounts: Receiver<::bitcoin::Amount>,
dial_alice: Receiver<()>,
send_msg0: Receiver<bob::Message0>,
send_msg1: Receiver<bob::Message1>,
send_msg2: Receiver<bob::Message2>,
send_msg3: Receiver<EncryptedSignature>,
}
impl EventLoop {
pub fn new(transport: SwapTransport, behaviour: Behaviour) -> Result<(Self, EventLoopHandle)> {
pub fn new(
transport: SwapTransport,
behaviour: Behaviour,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<(Self, EventLoopHandle)> {
let local_peer_id = behaviour.peer_id();
let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id)
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, local_peer_id)
.executor(Box::new(TokioExecutor {
handle: tokio::runtime::Handle::current(),
}))
.build();
swarm.add_address(alice_peer_id.clone(), alice_addr);
let amounts = Channels::new();
let msg0 = Channels::new();
let msg1 = Channels::new();
let msg2 = Channels::new();
let conn_established = Channels::new();
let dial_alice = Channels::new();
let add_address = Channels::new();
let send_msg0 = Channels::new();
let send_msg1 = Channels::new();
let send_msg2 = Channels::new();
let send_msg3 = Channels::new();
let driver = EventLoop {
let event_loop = EventLoop {
swarm,
alice_peer_id,
request_amounts: amounts.receiver,
msg0: msg0.sender,
msg1: msg1.sender,
msg2: msg2.sender,
conn_established: conn_established.sender,
dial_alice: dial_alice.receiver,
add_address: add_address.receiver,
send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver,
@ -179,14 +170,13 @@ impl EventLoop {
msg2: msg2.receiver,
conn_established: conn_established.receiver,
dial_alice: dial_alice.sender,
add_address: add_address.sender,
send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender,
send_msg3: send_msg3.sender,
};
Ok((driver, handle))
Ok((event_loop, handle))
}
pub async fn run(mut self) {
@ -210,14 +200,9 @@ impl EventLoop {
OutEvent::Message3 => info!("Alice acknowledged message 3 received"),
}
},
peer_id_addr = self.add_address.next().fuse() => {
if let Some((peer_id, addr)) = peer_id_addr {
debug!("Add address for {}: {}", peer_id, addr);
self.swarm.add_address(peer_id, addr);
}
},
peer_id = self.dial_alice.next().fuse() => {
if let Some(peer_id) = peer_id {
null = self.dial_alice.next().fuse() => {
if let Some(_) = null {
let peer_id = self.alice_peer_id.clone();
if self.swarm.pt.is_connected(&peer_id) {
debug!("Already connected to Alice: {}", peer_id);
let _ = self.conn_established.send(peer_id).await;
@ -232,30 +217,30 @@ impl EventLoop {
}
},
amounts = self.request_amounts.next().fuse() => {
if let Some((peer_id, btc_amount)) = amounts {
self.swarm.request_amounts(peer_id, btc_amount.as_sat());
if let Some(btc_amount) = amounts {
self.swarm.request_amounts(self.alice_peer_id.clone(), btc_amount.as_sat());
}
},
msg0 = self.send_msg0.next().fuse() => {
if let Some((peer_id, msg)) = msg0 {
self.swarm.send_message0(peer_id, msg);
if let Some(msg) = msg0 {
self.swarm.send_message0(self.alice_peer_id.clone(), msg);
}
}
msg1 = self.send_msg1.next().fuse() => {
if let Some((peer_id, msg)) = msg1 {
self.swarm.send_message1(peer_id, msg);
if let Some(msg) = msg1 {
self.swarm.send_message1(self.alice_peer_id.clone(), msg);
}
},
msg2 = self.send_msg2.next().fuse() => {
if let Some((peer_id, msg)) = msg2 {
self.swarm.send_message2(peer_id, msg);
if let Some(msg) = msg2 {
self.swarm.send_message2(self.alice_peer_id.clone(), msg);
}
},
msg3 = self.send_msg3.next().fuse() => {
if let Some((peer_id, tx_redeem_encsig)) = msg3 {
self.swarm.send_message3(peer_id, tx_redeem_encsig);
if let Some(tx_redeem_encsig) = msg3 {
self.swarm.send_message3(self.alice_peer_id.clone(), tx_redeem_encsig);
}
}
}

View File

@ -7,7 +7,6 @@ use crate::{
};
use anyhow::{bail, Result};
use async_recursion::async_recursion;
use libp2p::{core::Multiaddr, PeerId};
use rand::{CryptoRng, RngCore};
use std::{convert::TryFrom, fmt, sync::Arc};
use tokio::select;
@ -23,11 +22,10 @@ pub enum BobState {
Started {
state0: bob::State0,
amounts: SwapAmounts,
alice_peer_id: PeerId,
},
Negotiated(bob::State2, PeerId),
BtcLocked(bob::State3, PeerId),
XmrLocked(bob::State4, PeerId),
Negotiated(bob::State2),
BtcLocked(bob::State3),
XmrLocked(bob::State4),
EncSigSent(bob::State4),
BtcRedeemed(bob::State5),
T1Expired(bob::State4),
@ -64,9 +62,9 @@ impl From<BobState> for state::Bob {
// TODO: Do we want to resume just started swaps
unimplemented!("Cannot save a swap that has just started")
}
BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id },
BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id },
BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id },
BobState::Negotiated(state2) => Bob::Negotiated { state2 },
BobState::BtcLocked(state3) => Bob::BtcLocked { state3 },
BobState::XmrLocked(state4) => Bob::XmrLocked { state4 },
BobState::EncSigSent(state4) => Bob::EncSigSent { state4 },
BobState::BtcRedeemed(state5) => Bob::BtcRedeemed(state5),
BobState::T1Expired(state4) => Bob::T1Expired(state4),
@ -85,9 +83,9 @@ impl TryFrom<state::Swap> for BobState {
fn try_from(db_state: state::Swap) -> Result<Self, Self::Error> {
if let Swap::Bob(state) = db_state {
let bob_State = match state {
Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id),
Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id),
Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id),
Bob::Negotiated { state2 } => BobState::Negotiated(state2),
Bob::BtcLocked { state3 } => BobState::BtcLocked(state3),
Bob::XmrLocked { state4 } => BobState::XmrLocked(state4),
Bob::EncSigSent { state4 } => BobState::EncSigSent(state4),
Bob::BtcRedeemed(state5) => BobState::BtcRedeemed(state5),
Bob::T1Expired(state4) => BobState::T1Expired(state4),
@ -106,22 +104,16 @@ impl TryFrom<state::Swap> for BobState {
#[allow(clippy::too_many_arguments)]
pub async fn swap<R>(
state: BobState,
mut event_loop_handle: EventLoopHandle,
event_loop_handle: EventLoopHandle,
db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>,
rng: R,
swap_id: Uuid,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<BobState>
where
R: RngCore + CryptoRng + Send,
{
event_loop_handle
.add_address(alice_peer_id, alice_addr)
.await?;
run_until(
state,
is_complete,
@ -178,24 +170,19 @@ where
Ok(state)
} else {
match state {
BobState::Started {
state0,
amounts,
alice_peer_id,
} => {
event_loop_handle.dial(alice_peer_id.clone()).await?;
BobState::Started { state0, amounts } => {
event_loop_handle.dial().await?;
let (state2, alice_peer_id) = negotiate(
let state2 = negotiate(
state0,
amounts,
&mut event_loop_handle,
alice_peer_id.clone(),
&mut rng,
bitcoin_wallet.clone(),
)
.await?;
let state = BobState::Negotiated(state2, alice_peer_id);
let state = BobState::Negotiated(state2);
let db_state = state.clone().into();
db.insert_latest_state(swap_id, state::Swap::Bob(db_state))
.await?;
@ -211,13 +198,13 @@ where
)
.await
}
BobState::Negotiated(state2, alice_peer_id) => {
BobState::Negotiated(state2) => {
// Do not lock Bitcoin if not connected to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
event_loop_handle.dial().await?;
// Alice and Bob have exchanged info
let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?;
let state = BobState::BtcLocked(state3, alice_peer_id);
let state = BobState::BtcLocked(state3);
let db_state = state.clone().into();
db.insert_latest_state(swap_id, state::Swap::Bob(db_state))
.await?;
@ -235,9 +222,9 @@ where
}
// Bob has locked Btc
// Watch for Alice to Lock Xmr or for t1 to elapse
BobState::BtcLocked(state3, alice_peer_id) => {
BobState::BtcLocked(state3) => {
// TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
event_loop_handle.dial().await?;
// todo: watch until t1, not indefinitely
let msg2 = event_loop_handle.recv_message2().await?;
@ -245,7 +232,7 @@ where
.watch_for_lock_xmr(monero_wallet.as_ref(), msg2)
.await?;
let state = BobState::XmrLocked(state4, alice_peer_id);
let state = BobState::XmrLocked(state4);
let db_state = state.clone().into();
db.insert_latest_state(swap_id, state::Swap::Bob(db_state))
.await?;
@ -261,9 +248,9 @@ where
)
.await
}
BobState::XmrLocked(state, alice_peer_id) => {
BobState::XmrLocked(state) => {
// TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
event_loop_handle.dial().await?;
let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? {
// Alice has locked Xmr
@ -272,8 +259,7 @@ where
let state4_clone = state.clone();
// TODO(Franck): Refund if message cannot be sent.
let enc_sig_sent_watcher =
event_loop_handle.send_message3(alice_peer_id.clone(), tx_redeem_encsig);
let enc_sig_sent_watcher = event_loop_handle.send_message3(tx_redeem_encsig);
let bitcoin_wallet = bitcoin_wallet.clone();
let t1_timeout = state4_clone.wait_for_t1(bitcoin_wallet.as_ref());
@ -420,33 +406,24 @@ pub async fn negotiate<R>(
state0: xmr_btc::bob::State0,
amounts: SwapAmounts,
swarm: &mut EventLoopHandle,
alice_peer_id: PeerId,
mut rng: R,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) -> Result<(State2, PeerId)>
) -> Result<State2>
where
R: RngCore + CryptoRng + Send,
{
tracing::trace!("Starting negotiate");
swarm
.request_amounts(alice_peer_id.clone(), amounts.btc)
.await?;
swarm.request_amounts(amounts.btc).await?;
swarm
.send_message0(alice_peer_id.clone(), state0.next_message(&mut rng))
.await?;
swarm.send_message0(state0.next_message(&mut rng)).await?;
let msg0 = swarm.recv_message0().await?;
let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?;
swarm
.send_message1(alice_peer_id.clone(), state1.next_message())
.await?;
swarm.send_message1(state1.next_message()).await?;
let msg1 = swarm.recv_message1().await?;
let state2 = state1.receive(msg1)?;
swarm
.send_message2(alice_peer_id.clone(), state2.next_message())
.await?;
swarm.send_message2(state2.next_message()).await?;
Ok((state2, alice_peer_id))
Ok(state2)
}

View File

@ -1,4 +1,3 @@
use libp2p::PeerId;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use xmr_btc::{alice, bitcoin::EncryptedSignature, bob, monero, serde::monero_private_key};
@ -39,24 +38,10 @@ pub enum Alice {
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum Bob {
Negotiated {
state2: bob::State2,
#[serde(with = "crate::serde::peer_id")]
peer_id: PeerId,
},
BtcLocked {
state3: bob::State3,
#[serde(with = "crate::serde::peer_id")]
peer_id: PeerId,
},
XmrLocked {
state4: bob::State4,
#[serde(with = "crate::serde::peer_id")]
peer_id: PeerId,
},
EncSigSent {
state4: bob::State4,
},
Negotiated { state2: bob::State2 },
BtcLocked { state3: bob::State3 },
XmrLocked { state4: bob::State4 },
EncSigSent { state4: bob::State4 },
BtcRedeemed(bob::State5),
T1Expired(bob::State4),
BtcCancelled(bob::State4),

View File

@ -88,7 +88,6 @@ async fn happy_path() {
)
.boxed();
let alice_peer_id = alice_event_loop.peer_id();
let alice_fut = select(alice_swap_fut, alice_event_loop.run().boxed());
let bob_swap_fut = bob::swap::swap(
@ -99,8 +98,6 @@ async fn happy_path() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
alice_peer_id,
alice_multiaddr,
)
.boxed();

View File

@ -84,8 +84,6 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
alice_peer_id,
alice_multiaddr.clone(),
);
let alice_db_datadir = tempdir().unwrap();

View File

@ -124,7 +124,7 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
}
let (event_loop_after_restart, event_loop_handle_after_restart) =
testutils::init_bob_event_loop();
testutils::init_bob_event_loop(alice_peer_id, alice_multiaddr);
let _bob_swarm_fut = tokio::spawn(async move { event_loop_after_restart.run().await });
let db_swap = bob_db.get_state(bob_swap_id).unwrap();
@ -138,8 +138,6 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
bob_xmr_wallet,
OsRng,
bob_swap_id,
alice_peer_id,
alice_multiaddr,
)
.await
.unwrap();

View File

@ -109,7 +109,8 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
_ = bob_event_loop_1.run() => panic!("The event loop should never finish")
};
let (bob_event_loop_2, bob_event_loop_handle_2) = testutils::init_bob_event_loop();
let (bob_event_loop_2, bob_event_loop_handle_2) =
testutils::init_bob_event_loop(alice_peer_id, alice_multiaddr);
let bob_fut = bob::swap::swap(
bob_restart_state,
@ -119,8 +120,6 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
bob_xmr_wallet.clone(),
OsRng,
bob_swap_id,
alice_peer_id,
alice_multiaddr,
);
let bob_final_state = select! {

View File

@ -81,8 +81,6 @@ async fn given_alice_restarts_after_xmr_is_locked_abort_swap() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
alice_event_loop_1.peer_id(),
alice_multiaddr.clone(),
);
let alice_swap_id = Uuid::new_v4();

View File

@ -155,7 +155,6 @@ pub async fn init_alice(
}
pub async fn init_bob_state(
alice_peer_id: PeerId,
btc_to_swap: bitcoin::Amount,
xmr_to_swap: xmr_btc::monero::Amount,
bob_btc_wallet: Arc<bitcoin::Wallet>,
@ -176,17 +175,17 @@ pub async fn init_bob_state(
refund_address,
);
BobState::Started {
state0,
amounts,
alice_peer_id,
}
BobState::Started { state0, amounts }
}
pub fn init_bob_event_loop() -> (bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle) {
pub fn init_bob_event_loop(
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> (bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle) {
let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity()).unwrap();
bob::event_loop::EventLoop::new(bob_transport, bob_behaviour).unwrap()
bob::event_loop::EventLoop::new(bob_transport, bob_behaviour, alice_peer_id, alice_addr)
.unwrap()
}
#[allow(clippy::too_many_arguments)]
@ -217,21 +216,9 @@ pub async fn init_bob(
)
.await;
let bob_state = init_bob_state(
alice_peer_id.clone(),
btc_to_swap,
xmr_to_swap,
bob_btc_wallet.clone(),
config,
)
.await;
let bob_state = init_bob_state(btc_to_swap, xmr_to_swap, bob_btc_wallet.clone(), config).await;
let (event_loop, mut event_loop_handle) = init_bob_event_loop();
event_loop_handle
.add_address(alice_peer_id, alice_multiaddr)
.await
.unwrap();
let (event_loop, event_loop_handle) = init_bob_event_loop(alice_peer_id, alice_multiaddr);
let bob_db_dir = tempdir().unwrap();
let bob_db = Database::open(bob_db_dir.path()).unwrap();