Add message1

This commit is contained in:
Tobin C. Harding 2020-10-22 13:23:12 +11:00
parent a0987ee2b8
commit 0f17ec076c
10 changed files with 296 additions and 16 deletions

View File

@ -12,8 +12,9 @@ use tracing::debug;
mod amounts; mod amounts;
mod message0; mod message0;
mod message1;
use self::{amounts::*, message0::*}; use self::{amounts::*, message0::*, message1::*};
use crate::{ use crate::{
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
@ -47,7 +48,7 @@ pub async fn swap<R: RngCore + CryptoRng>(
debug!("Got request from Bob to swap {}", btc); debug!("Got request from Bob to swap {}", btc);
let p = calculate_amounts(btc); let p = calculate_amounts(btc);
last_amounts = Some(p); last_amounts = Some(p);
swarm.send(channel, AliceToBob::Amounts(p)); swarm.send_amounts(channel, p);
} }
OutEvent::Message0(msg) => { OutEvent::Message0(msg) => {
debug!("Got message0 from Bob"); debug!("Got message0 from Bob");
@ -55,6 +56,7 @@ pub async fn swap<R: RngCore + CryptoRng>(
message0 = Some(msg); message0 = Some(msg);
break; break;
} }
other => panic!("Unexpected event: {:?}", other),
}; };
} }
@ -78,11 +80,25 @@ pub async fn swap<R: RngCore + CryptoRng>(
); );
swarm.set_state0(state0.clone()); swarm.set_state0(state0.clone());
let _state1 = match message0 { let state1 = match message0 {
Some(msg) => state0.receive(msg), Some(msg) => state0.receive(msg).expect("failed to receive msg 0"),
None => panic!("should have the message by here"), None => panic!("should have the message by here"),
}; };
let (state2, channel) = match swarm.next().await {
OutEvent::Message1 { msg, channel } => {
debug!("Got message1 from Bob");
let state2 = state1.receive(msg);
(state2, channel)
}
other => panic!("Unexpected event: {:?}", other),
};
let msg = state2.next_message();
swarm.send_message1(channel, msg);
tracing::info!("handshake complete, we now have State2 for Alice.");
tracing::warn!("parking thread ..."); tracing::warn!("parking thread ...");
thread::park(); thread::park();
Ok(()) Ok(())
@ -118,6 +134,10 @@ pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
Request(amounts::OutEvent), Request(amounts::OutEvent),
Message0(bob::Message0), Message0(bob::Message0),
Message1 {
msg: bob::Message1,
channel: ResponseChannel<AliceToBob>,
},
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -144,6 +164,14 @@ impl From<message0::OutEvent> for OutEvent {
} }
} }
impl From<message1::OutEvent> for OutEvent {
fn from(event: message1::OutEvent) -> Self {
match event {
message1::OutEvent::Msg { msg, channel } => OutEvent::Message1 { msg, channel },
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
@ -152,6 +180,7 @@ pub struct Alice {
pt: PeerTracker, pt: PeerTracker,
amounts: Amounts, amounts: Amounts,
message0: Message0, message0: Message0,
message1: Message1,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
} }
@ -166,13 +195,22 @@ impl Alice {
} }
/// Alice always sends her messages as a response to a request from Bob. /// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: AliceToBob) { pub fn send_amounts(&mut self, channel: ResponseChannel<AliceToBob>, p: SwapParams) {
let msg = AliceToBob::Amounts(p);
self.amounts.send(channel, msg); self.amounts.send(channel, msg);
} }
pub fn set_state0(&mut self, state: State0) { pub fn set_state0(&mut self, state: State0) {
let _ = self.message0.set_state(state); let _ = self.message0.set_state(state);
} }
pub fn send_message1(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: xmr_btc::alice::Message1,
) {
self.message1.send(channel, msg)
}
} }
impl Default for Alice { impl Default for Alice {
@ -184,6 +222,7 @@ impl Default for Alice {
pt: PeerTracker::default(), pt: PeerTracker::default(),
amounts: Amounts::new(timeout), amounts: Amounts::new(timeout),
message0: Message0::new(timeout), message0: Message0::new(timeout),
message1: Message1::new(timeout),
identity, identity,
} }
} }

View File

@ -23,7 +23,7 @@ pub enum OutEvent {
Msg(bob::Message0), Msg(bob::Message0),
} }
/// A `NetworkBehaviour` that represents getting the amounts of an XMR/BTC swap. /// A `NetworkBehaviour` that represents send/recv of message 0.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")] #[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]

113
swap/src/alice/message1.rs Normal file
View File

@ -0,0 +1,113 @@
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol};
use xmr_btc::bob;
#[derive(Debug)]
pub enum OutEvent {
Msg {
/// Received message from Bob.
msg: bob::Message1,
/// Channel to send back Alice's message 1.
channel: ResponseChannel<AliceToBob>,
},
}
/// A `NetworkBehaviour` that represents send/recv of message 1.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Message1 {
rr: RequestResponse<Codec>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Message1 {
pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: xmr_btc::alice::Message1) {
let msg = AliceToBob::Message1(msg);
self.rr.send_response(channel, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Message1 {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Request {
request,
request_id: _,
channel,
},
} => match request {
BobToAlice::Message1(msg) => {
self.events.push_back(OutEvent::Msg { msg, channel });
}
other => panic!("unexpected request: {:?}", other),
},
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Response {
response: _,
request_id: _,
},
} => panic!("unexpected response"),
RequestResponseEvent::InboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Inbound failure: {:?}", error);
}
RequestResponseEvent::OutboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Outbound failure: {:?}", error);
}
}
}
}

View File

@ -8,12 +8,13 @@ use futures::{
use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{process, thread, time::Duration}; use std::{process, thread, time::Duration};
use tracing::{debug, info, warn}; use tracing::{debug, info};
mod amounts; mod amounts;
mod message0; mod message0;
mod message1;
use self::{amounts::*, message0::*}; use self::{amounts::*, message0::*, message1::*};
use crate::{ use crate::{
network::{ network::{
peer_tracker::{self, PeerTracker}, peer_tracker::{self, PeerTracker},
@ -80,15 +81,26 @@ where
PUNISH_TIMELOCK, PUNISH_TIMELOCK,
refund_address, refund_address,
); );
swarm.send_message0(alice.clone(), state0.next_message(rng)); swarm.send_message0(alice.clone(), state0.next_message(rng));
let _state1 = match swarm.next().await { let state1 = match swarm.next().await {
OutEvent::Message0(msg) => { OutEvent::Message0(msg) => {
state0.receive(&wallet, msg) // TODO: More graceful error handling. state0.receive(&wallet, msg).await? // TODO: More graceful error
// handling.
} }
other => panic!("unexpected event: {:?}", other), other => panic!("unexpected event: {:?}", other),
}; };
warn!("parking thread ..."); swarm.send_message1(alice.clone(), state1.next_message());
let _state2 = match swarm.next().await {
OutEvent::Message1(msg) => {
state1.receive(msg) // TODO: More graceful error handling.
}
other => panic!("unexpected event: {:?}", other),
};
info!("handshake complete, we now have State2 for Bob.");
thread::park(); thread::park();
Ok(()) Ok(())
} }
@ -120,6 +132,7 @@ pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
Amounts(amounts::OutEvent), Amounts(amounts::OutEvent),
Message0(alice::Message0), Message0(alice::Message0),
Message1(alice::Message1),
} }
impl From<peer_tracker::OutEvent> for OutEvent { impl From<peer_tracker::OutEvent> for OutEvent {
@ -146,6 +159,14 @@ impl From<message0::OutEvent> for OutEvent {
} }
} }
impl From<message1::OutEvent> for OutEvent {
fn from(event: message1::OutEvent) -> Self {
match event {
message1::OutEvent::Msg(msg) => OutEvent::Message1(msg),
}
}
}
/// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
@ -154,6 +175,7 @@ pub struct Bob {
pt: PeerTracker, pt: PeerTracker,
amounts: Amounts, amounts: Amounts,
message0: Message0, message0: Message0,
message1: Message1,
#[behaviour(ignore)] #[behaviour(ignore)]
identity: Keypair, identity: Keypair,
} }
@ -174,11 +196,16 @@ impl Bob {
debug!("Requesting amounts from: {}", alice); debug!("Requesting amounts from: {}", alice);
} }
/// Sends Bob's first state message to Alice. /// Sends Bob's first message to Alice.
pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) {
self.message0.send(alice, msg) self.message0.send(alice, msg)
} }
/// Sends Bob's second message to Alice.
pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) {
self.message1.send(alice, msg)
}
/// Returns Alice's peer id if we are connected. /// Returns Alice's peer id if we are connected.
pub fn peer_id_of_alice(&self) -> Option<PeerId> { pub fn peer_id_of_alice(&self) -> Option<PeerId> {
self.pt.counterparty_peer_id() self.pt.counterparty_peer_id()
@ -194,6 +221,7 @@ impl Default for Bob {
pt: PeerTracker::default(), pt: PeerTracker::default(),
amounts: Amounts::new(timeout), amounts: Amounts::new(timeout),
message0: Message0::new(timeout), message0: Message0::new(timeout),
message1: Message1::new(timeout),
identity, identity,
} }
} }

View File

@ -85,7 +85,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
}, },
} => match response { } => match response {
AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)), AliceToBob::Amounts(p) => self.events.push_back(OutEvent::Amounts(p)),
AliceToBob::Message0(_) => panic!("shouldn't get message0 here"), other => panic!("unexpected response: {:?}", other),
}, },
RequestResponseEvent::InboundFailure { .. } => { RequestResponseEvent::InboundFailure { .. } => {

View File

@ -80,7 +80,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
}, },
} => match response { } => match response {
AliceToBob::Message0(msg) => self.events.push_back(OutEvent::Msg(msg)), AliceToBob::Message0(msg) => self.events.push_back(OutEvent::Msg(msg)),
AliceToBob::Amounts(_) => panic!("shouldn't get amounts here"), other => panic!("unexpected response: {:?}", other),
}, },
RequestResponseEvent::InboundFailure { .. } => { RequestResponseEvent::InboundFailure { .. } => {

98
swap/src/bob/message1.rs Normal file
View File

@ -0,0 +1,98 @@
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
NetworkBehaviour, PeerId,
};
use std::{
collections::VecDeque,
task::{Context, Poll},
time::Duration,
};
use tracing::error;
use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Protocol};
use xmr_btc::{alice, bob};
#[derive(Debug)]
pub enum OutEvent {
Msg(alice::Message1),
}
/// A `NetworkBehaviour` that represents send/recv of message 1.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", poll_method = "poll")]
#[allow(missing_debug_implementations)]
pub struct Message1 {
rr: RequestResponse<Codec>,
#[behaviour(ignore)]
events: VecDeque<OutEvent>,
}
impl Message1 {
pub fn new(timeout: Duration) -> Self {
let mut config = RequestResponseConfig::default();
config.set_request_timeout(timeout);
Self {
rr: RequestResponse::new(
Codec::default(),
vec![(Protocol, ProtocolSupport::Full)],
config,
),
events: Default::default(),
}
}
pub fn send(&mut self, alice: PeerId, msg: bob::Message1) {
let msg = BobToAlice::Message1(msg);
let _id = self.rr.send_request(&alice, msg);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RequestProtocol<Codec>, OutEvent>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>> for Message1 {
fn inject_event(&mut self, event: RequestResponseEvent<BobToAlice, AliceToBob>) {
match event {
RequestResponseEvent::Message {
peer: _,
message: RequestResponseMessage::Request { .. },
} => panic!("Bob should never get a request from Alice"),
RequestResponseEvent::Message {
peer: _,
message:
RequestResponseMessage::Response {
response,
request_id: _,
},
} => match response {
AliceToBob::Message1(msg) => self.events.push_back(OutEvent::Msg(msg)),
other => panic!("unexpected response: {:?}", other),
},
RequestResponseEvent::InboundFailure { .. } => {
panic!("Bob should never get a request from Alice, so should never get an InboundFailure");
}
RequestResponseEvent::OutboundFailure {
peer: _,
request_id: _,
error,
} => {
error!("Outbound failure: {:?}", error);
}
}
}
}

View File

@ -21,6 +21,7 @@ pub enum BobToAlice {
AmountsFromBtc(::bitcoin::Amount), AmountsFromBtc(::bitcoin::Amount),
AmountsFromXmr(monero::Amount), AmountsFromXmr(monero::Amount),
Message0(bob::Message0), Message0(bob::Message0),
Message1(bob::Message1),
} }
/// Messages Alice sends to Bob. /// Messages Alice sends to Bob.
@ -29,6 +30,7 @@ pub enum BobToAlice {
pub enum AliceToBob { pub enum AliceToBob {
Amounts(SwapParams), Amounts(SwapParams),
Message0(alice::Message0), Message0(alice::Message0),
Message1(alice::Message1),
} }
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]

View File

@ -23,7 +23,7 @@ pub struct Message0 {
pub(crate) punish_address: bitcoin::Address, pub(crate) punish_address: bitcoin::Address,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message1 { pub struct Message1 {
pub(crate) tx_cancel_sig: Signature, pub(crate) tx_cancel_sig: Signature,
pub(crate) tx_refund_encsig: EncryptedSignature, pub(crate) tx_refund_encsig: EncryptedSignature,

View File

@ -22,7 +22,7 @@ pub struct Message0 {
pub(crate) refund_address: bitcoin::Address, pub(crate) refund_address: bitcoin::Address,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message1 { pub struct Message1 {
pub(crate) tx_lock: bitcoin::TxLock, pub(crate) tx_lock: bitcoin::TxLock,
} }