mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-01-22 21:31:10 -05:00
Use our rendezvous libp2p fork
This includes changing the network test framework (currently not in use) to reflect the latest libp2p version.
This commit is contained in:
parent
620ac569f7
commit
18eb1ab511
157
Cargo.lock
generated
157
Cargo.lock
generated
@ -1414,13 +1414,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hmac-drbg"
|
||||
version = "0.2.0"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c6e570451493f10f6581b48cdd530413b63ea9e780f544bfd3bdcaa0d89d1a7b"
|
||||
checksum = "17ea0a1394df5b6574da6e0c1ade9e78868c9fb0a4e5ef4428e32da4676b85b1"
|
||||
dependencies = [
|
||||
"digest 0.8.1",
|
||||
"generic-array 0.12.4",
|
||||
"hmac 0.7.1",
|
||||
"digest 0.9.0",
|
||||
"generic-array 0.14.4",
|
||||
"hmac 0.8.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1770,9 +1770,8 @@ checksum = "ba4aede83fc3617411dc6993bc8c70919750c1c257c6ca6a502aed6e0e2394ae"
|
||||
|
||||
[[package]]
|
||||
name = "libp2p"
|
||||
version = "0.38.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebbb17eece4aec5bb970880c73825c16ca59ca05a4e41803751e68c7e5f0c618"
|
||||
version = "0.39.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"bytes 1.0.1",
|
||||
@ -1789,7 +1788,7 @@ dependencies = [
|
||||
"libp2p-tcp",
|
||||
"libp2p-websocket",
|
||||
"libp2p-yamux",
|
||||
"parity-multiaddr",
|
||||
"multiaddr",
|
||||
"parking_lot 0.11.1",
|
||||
"pin-project 1.0.5",
|
||||
"smallvec",
|
||||
@ -1798,9 +1797,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-core"
|
||||
version = "0.28.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "554d3e7e9e65f939d66b75fd6a4c67f258fe250da61b91f46c545fc4a89b51d9"
|
||||
version = "0.29.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"asn1_der",
|
||||
"bs58",
|
||||
@ -1812,9 +1810,9 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"libsecp256k1",
|
||||
"log 0.4.14",
|
||||
"multiaddr",
|
||||
"multihash",
|
||||
"multistream-select",
|
||||
"parity-multiaddr",
|
||||
"parking_lot 0.11.1",
|
||||
"pin-project 1.0.5",
|
||||
"prost",
|
||||
@ -1832,9 +1830,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-dns"
|
||||
version = "0.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62e63dab8b5ff35e0c101a3e51e843ba782c07bbb1682f5fd827622e0d02b98b"
|
||||
version = "0.29.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libp2p-core",
|
||||
@ -1845,9 +1842,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-mplex"
|
||||
version = "0.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85e9b544335d1ed30af71daa96edbefadef6f19c7a55f078b9fc92c87163105d"
|
||||
version = "0.29.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"asynchronous-codec",
|
||||
"bytes 1.0.1",
|
||||
@ -1863,9 +1859,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-noise"
|
||||
version = "0.31.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57a2aa6fc4e6855eaf9ea1941a14f7ec4df35636fb6b85951e17481df8dcecf6"
|
||||
version = "0.32.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"curve25519-dalek",
|
||||
@ -1885,9 +1880,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-ping"
|
||||
version = "0.29.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf4bfaffac63bf3c7ec11ed9d8879d455966ddea7e78ee14737f0b6dce0d1cd1"
|
||||
version = "0.30.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libp2p-core",
|
||||
@ -1900,9 +1894,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-request-response"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cdbe172f08e6d0f95fa8634e273d4c4268c4063de2e33e7435194b0130c62e3"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes 1.0.1",
|
||||
@ -1920,9 +1913,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-swarm"
|
||||
version = "0.29.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e04d8e1eef675029ec728ba14e8d0da7975d84b6679b699b4ae91a1de9c3a92"
|
||||
version = "0.30.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
@ -1937,8 +1929,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "libp2p-swarm-derive"
|
||||
version = "0.23.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "365b0a699fea5168676840567582a012ea297b1ca02eee467e58301b9c9c5eed"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"quote 1.0.9",
|
||||
"syn 1.0.64",
|
||||
@ -1946,9 +1937,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-tcp"
|
||||
version = "0.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b1a27d21c477951799e99d5c105d78868258502ce092988040a808d5a19bbd9"
|
||||
version = "0.29.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"futures-timer",
|
||||
@ -1963,9 +1953,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-websocket"
|
||||
version = "0.29.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cace60995ef6f637e4752cccbb2590f6bc358e8741a0d066307636c69a4b3a74"
|
||||
version = "0.30.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
@ -1981,9 +1970,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libp2p-yamux"
|
||||
version = "0.32.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f35da42cfc6d5cb0dcf3ad6881bc68d146cdf38f98655e09e33fbba4d13eabc4"
|
||||
version = "0.33.0"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libp2p-core",
|
||||
@ -1994,20 +1982,52 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libsecp256k1"
|
||||
version = "0.3.5"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fc1e2c808481a63dc6da2074752fdd4336a3c8fcc68b83db6f1fd5224ae7962"
|
||||
checksum = "bd1137239ab33b41aa9637a88a28249e5e70c40a42ccc92db7f12cc356c1fcd7"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"crunchy",
|
||||
"digest 0.8.1",
|
||||
"base64 0.12.3",
|
||||
"digest 0.9.0",
|
||||
"hmac-drbg",
|
||||
"libsecp256k1-core",
|
||||
"libsecp256k1-gen-ecmult",
|
||||
"libsecp256k1-gen-genmult",
|
||||
"rand 0.7.3",
|
||||
"sha2 0.8.2",
|
||||
"subtle 2.4.0",
|
||||
"serde",
|
||||
"sha2 0.9.5",
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libsecp256k1-core"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ee11012b293ea30093c129173cac4335513064094619f4639a25b310fd33c11"
|
||||
dependencies = [
|
||||
"crunchy",
|
||||
"digest 0.9.0",
|
||||
"subtle 2.4.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libsecp256k1-gen-ecmult"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32239626ffbb6a095b83b37a02ceb3672b2443a87a000a884fc3c4d16925c9c0"
|
||||
dependencies = [
|
||||
"libsecp256k1-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libsecp256k1-gen-genmult"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76acb433e21d10f5f9892b1962c2856c58c7f39a9e4bd68ac82b9436a0ffd5b9"
|
||||
dependencies = [
|
||||
"libsecp256k1-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.2"
|
||||
@ -2282,6 +2302,24 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multiaddr"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7139982f583d7e53879d9f611fe48ced18e77d684309484f2252c76bcd39f549"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"bs58",
|
||||
"byteorder",
|
||||
"data-encoding",
|
||||
"multihash",
|
||||
"percent-encoding 2.1.0",
|
||||
"serde",
|
||||
"static_assertions",
|
||||
"unsigned-varint 0.7.0",
|
||||
"url 2.2.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multihash"
|
||||
version = "0.13.2"
|
||||
@ -2317,9 +2355,8 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "multistream-select"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d91ec0a2440aaff5f78ec35631a7027d50386c6163aa975f7caa0d5da4b6ff8"
|
||||
version = "0.10.3"
|
||||
source = "git+https://github.com/comit-network/rust-libp2p?branch=rendezvous#c3cd411cedb12eab72ba26b08c43c2467a8fd8e9"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"futures",
|
||||
@ -2472,24 +2509,6 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
|
||||
|
||||
[[package]]
|
||||
name = "parity-multiaddr"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "58341485071825827b7f03cf7efd1cb21e6a709bea778fb50227fd45d2f361b4"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"bs58",
|
||||
"byteorder",
|
||||
"data-encoding",
|
||||
"multihash",
|
||||
"percent-encoding 2.1.0",
|
||||
"serde",
|
||||
"static_assertions",
|
||||
"unsigned-varint 0.7.0",
|
||||
"url 2.2.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.10.2"
|
||||
|
@ -29,7 +29,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features =
|
||||
ed25519-dalek = "1"
|
||||
futures = { version = "0.3", default-features = false }
|
||||
itertools = "0.10"
|
||||
libp2p = { version = "0.38", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] }
|
||||
libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping" ] }
|
||||
miniscript = { version = "5", features = [ "serde" ] }
|
||||
monero = { version = "0.12", features = [ "serde_support" ] }
|
||||
monero-rpc = { path = "../monero-rpc" }
|
||||
|
@ -149,9 +149,9 @@ where
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
swarm_event = self.swarm.next_event() => {
|
||||
swarm_event = self.swarm.next() => {
|
||||
match swarm_event {
|
||||
SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot })) => {
|
||||
|
||||
let (btc, responder) = match send_wallet_snapshot.recv().await {
|
||||
Ok((btc, responder)) => (btc, responder),
|
||||
@ -172,13 +172,13 @@ where
|
||||
// Ignore result, we should never hit this because the receiver will alive as long as the connection is.
|
||||
let _ = responder.respond(wallet_snapshot);
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3})) => {
|
||||
let _ = self.handle_execution_setup_done(peer_id, swap_id, *state3).await;
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::SwapDeclined { peer, error })) => {
|
||||
tracing::warn!(%peer, "Ignoring spot price request because: {}", error);
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer })) => {
|
||||
let quote = match self.make_quote(self.min_buy, self.max_buy).await {
|
||||
Ok(quote) => quote,
|
||||
Err(error) => {
|
||||
@ -191,13 +191,13 @@ where
|
||||
tracing::debug!(%peer, "Failed to respond with quote");
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id })) => {
|
||||
tracing::debug!(%peer, "Bob acknowledged transfer proof");
|
||||
if let Some(responder) = self.inflight_transfer_proofs.remove(&id) {
|
||||
let _ = responder.respond(());
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer })) => {
|
||||
let swap_id = msg.swap_id;
|
||||
let swap_peer = self.db.get_peer_id(swap_id);
|
||||
|
||||
@ -246,12 +246,12 @@ where
|
||||
channel
|
||||
}.boxed());
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::Failure {peer, error})) => {
|
||||
tracing::error!(
|
||||
%peer,
|
||||
"Communication error. Error {:#}", error);
|
||||
}
|
||||
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
|
||||
Some(SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. }) => {
|
||||
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
|
||||
|
||||
if let Some(transfer_proofs) = self.buffered_transfer_proofs.remove(&peer) {
|
||||
@ -263,16 +263,16 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
|
||||
Some(SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. }) => {
|
||||
tracing::warn!(%address, "Failed to set up connection with peer. Error {:#}", error);
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: Some(error) } if num_established == 0 => {
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: Some(error) }) if num_established == 0 => {
|
||||
tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection. Error {:#}", error);
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None } if num_established == 0 => {
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None }) if num_established == 0 => {
|
||||
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
|
||||
}
|
||||
SwarmEvent::NewListenAddr(address) => {
|
||||
Some(SwarmEvent::NewListenAddr(address)) => {
|
||||
tracing::info!(%address, "New listen address detected");
|
||||
}
|
||||
_ => {}
|
||||
|
@ -94,19 +94,19 @@ impl EventLoop {
|
||||
loop {
|
||||
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
|
||||
tokio::select! {
|
||||
swarm_event = self.swarm.next_event().fuse() => {
|
||||
swarm_event = self.swarm.next().fuse() => {
|
||||
match swarm_event {
|
||||
SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response })) => {
|
||||
if let Some(responder) = self.inflight_quote_requests.remove(&id) {
|
||||
let _ = responder.respond(response);
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response))) => {
|
||||
if let Some(responder) = self.inflight_swap_setup.take() {
|
||||
let _ = responder.respond(*response);
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer })) => {
|
||||
let swap_id = msg.swap_id;
|
||||
|
||||
if peer != self.alice_peer_id {
|
||||
@ -142,34 +142,34 @@ impl EventLoop {
|
||||
channel
|
||||
}.boxed()));
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id })) => {
|
||||
if let Some(responder) = self.inflight_encrypted_signature_requests.remove(&id) {
|
||||
let _ = responder.respond(());
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer }) if peer == self.alice_peer_id => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer })) if peer == self.alice_peer_id => {
|
||||
tracing::error!("Exhausted all re-dial attempts to Alice");
|
||||
return;
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::Failure { peer, error }) => {
|
||||
Some(SwarmEvent::Behaviour(OutEvent::Failure { peer, error })) => {
|
||||
tracing::warn!(%peer, "Communication error: {:#}", error);
|
||||
return;
|
||||
}
|
||||
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => {
|
||||
Some(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) if peer_id == self.alice_peer_id => {
|
||||
tracing::info!("Connected to Alice at {}", endpoint.get_remote_address());
|
||||
}
|
||||
SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => {
|
||||
Some(SwarmEvent::Dialing(peer_id)) if peer_id == self.alice_peer_id => {
|
||||
tracing::debug!("Dialling Alice at {}", peer_id);
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) } if peer_id == self.alice_peer_id && num_established == 0 => {
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) }) if peer_id == self.alice_peer_id && num_established == 0 => {
|
||||
tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error);
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => {
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. }) if peer_id == self.alice_peer_id && num_established == 0 => {
|
||||
// no error means the disconnection was requested
|
||||
tracing::info!("Successfully closed connection to Alice");
|
||||
return;
|
||||
}
|
||||
SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => {
|
||||
Some(SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error }) if peer_id == self.alice_peer_id && attempts_remaining == 0 => {
|
||||
tracing::warn!(%address, "Failed to dial Alice: {}", error);
|
||||
|
||||
if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() {
|
||||
|
@ -1,19 +1,18 @@
|
||||
use futures::future;
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::FusedStream;
|
||||
use futures::{future, Future, Stream, StreamExt};
|
||||
use libp2p::core::muxing::StreamMuxerBox;
|
||||
use libp2p::core::transport::memory::MemoryTransport;
|
||||
use libp2p::core::upgrade::{SelectUpgrade, Version};
|
||||
use libp2p::core::{Executor, Multiaddr};
|
||||
use libp2p::core::transport::upgrade::Version;
|
||||
use libp2p::core::transport::MemoryTransport;
|
||||
use libp2p::core::upgrade::SelectUpgrade;
|
||||
use libp2p::core::{identity, Executor, Multiaddr, PeerId, Transport};
|
||||
use libp2p::mplex::MplexConfig;
|
||||
use libp2p::noise::{self, NoiseConfig, X25519Spec};
|
||||
use libp2p::swarm::{
|
||||
IntoProtocolsHandler, NetworkBehaviour, ProtocolsHandler, SwarmBuilder, SwarmEvent,
|
||||
};
|
||||
use libp2p::{identity, yamux, PeerId, Swarm, Transport};
|
||||
use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
|
||||
use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
|
||||
use libp2p::yamux::YamuxConfig;
|
||||
use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
|
||||
/// An adaptor struct for libp2p that spawns futures into the current
|
||||
/// thread-local runtime.
|
||||
@ -25,49 +24,18 @@ impl Executor for GlobalSpawnTokioExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Actor<B: NetworkBehaviour> {
|
||||
pub swarm: Swarm<B>,
|
||||
pub addr: Multiaddr,
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F) -> (Actor<B>, Actor<B>)
|
||||
pub fn new_swarm<B, F>(behaviour_fn: F) -> Swarm<B>
|
||||
where
|
||||
B: NetworkBehaviour,
|
||||
F: Fn(PeerId, identity::Keypair) -> B + Clone,
|
||||
<<<B as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Clone,
|
||||
<B as NetworkBehaviour>::OutEvent: Debug{
|
||||
let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone());
|
||||
let mut alice = Actor {
|
||||
swarm,
|
||||
addr,
|
||||
peer_id,
|
||||
};
|
||||
|
||||
let (swarm, addr, peer_id) = new_swarm(behaviour_fn);
|
||||
let mut bob = Actor {
|
||||
swarm,
|
||||
addr,
|
||||
peer_id,
|
||||
};
|
||||
|
||||
connect(&mut alice.swarm, &mut bob.swarm).await;
|
||||
|
||||
(alice, bob)
|
||||
}
|
||||
|
||||
pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(
|
||||
behaviour_fn: F,
|
||||
) -> (Swarm<B>, Multiaddr, PeerId)
|
||||
where
|
||||
<B as NetworkBehaviour>::OutEvent: Debug,
|
||||
B: NetworkBehaviour,
|
||||
F: FnOnce(PeerId, identity::Keypair) -> B,
|
||||
{
|
||||
let id_keys = identity::Keypair::generate_ed25519();
|
||||
let peer_id = PeerId::from(id_keys.public());
|
||||
let identity = identity::Keypair::generate_ed25519();
|
||||
let peer_id = PeerId::from(identity.public());
|
||||
|
||||
let dh_keys = noise::Keypair::<X25519Spec>::new()
|
||||
.into_authentic(&id_keys)
|
||||
let dh_keys = Keypair::<X25519Spec>::new()
|
||||
.into_authentic(&identity)
|
||||
.expect("failed to create dh_keys");
|
||||
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
|
||||
|
||||
@ -75,88 +43,146 @@ where
|
||||
.upgrade(Version::V1)
|
||||
.authenticate(noise)
|
||||
.multiplex(SelectUpgrade::new(
|
||||
yamux::YamuxConfig::default(),
|
||||
YamuxConfig::default(),
|
||||
MplexConfig::new(),
|
||||
))
|
||||
.timeout(Duration::from_secs(5))
|
||||
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
|
||||
.boxed();
|
||||
|
||||
let mut swarm: Swarm<B> = SwarmBuilder::new(transport, behaviour_fn(peer_id, id_keys), peer_id)
|
||||
SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id)
|
||||
.executor(Box::new(GlobalSpawnTokioExecutor))
|
||||
.build();
|
||||
.build()
|
||||
}
|
||||
|
||||
fn get_rand_memory_address() -> Multiaddr {
|
||||
let address_port = rand::random::<u64>();
|
||||
let addr = format!("/memory/{}", address_port)
|
||||
.parse::<Multiaddr>()
|
||||
.unwrap();
|
||||
|
||||
Swarm::listen_on(&mut swarm, addr.clone()).unwrap();
|
||||
|
||||
(swarm, addr, peer_id)
|
||||
addr
|
||||
}
|
||||
|
||||
pub async fn await_events_or_timeout<A, B>(
|
||||
alice_event: impl Future<Output = A>,
|
||||
bob_event: impl Future<Output = B>,
|
||||
) -> (A, B) {
|
||||
time::timeout(
|
||||
Duration::from_secs(10),
|
||||
future::join(alice_event, bob_event),
|
||||
pub async fn await_events_or_timeout<A, B, E1, E2>(
|
||||
swarm_1: &mut (impl Stream<Item = SwarmEvent<A, E1>> + FusedStream + Unpin),
|
||||
swarm_2: &mut (impl Stream<Item = SwarmEvent<B, E2>> + FusedStream + Unpin),
|
||||
) -> (SwarmEvent<A, E1>, SwarmEvent<B, E2>)
|
||||
where
|
||||
SwarmEvent<A, E1>: Debug,
|
||||
SwarmEvent<B, E2>: Debug,
|
||||
{
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(30),
|
||||
future::join(
|
||||
swarm_1
|
||||
.inspect(|event| tracing::debug!("Swarm1 emitted {:?}", event))
|
||||
.select_next_some(),
|
||||
swarm_2
|
||||
.inspect(|event| tracing::debug!("Swarm2 emitted {:?}", event))
|
||||
.select_next_some(),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("network behaviours to emit an event within 10 seconds")
|
||||
}
|
||||
|
||||
/// Connects two swarms with each other.
|
||||
///
|
||||
/// This assumes the transport that is in use can be used by Bob to connect to
|
||||
/// the listen address that is emitted by Alice. In other words, they have to be
|
||||
/// on the same network. The memory transport used by the above `new_swarm`
|
||||
/// function fulfills this.
|
||||
///
|
||||
/// We also assume that the swarms don't emit any behaviour events during the
|
||||
/// connection phase. Any event emitted is considered a bug from this functions
|
||||
/// PoV because they would be lost.
|
||||
pub async fn connect<BA, BB>(alice: &mut Swarm<BA>, bob: &mut Swarm<BB>)
|
||||
/// An extension trait for [`Swarm`] that makes it easier to set up a network of
|
||||
/// [`Swarm`]s for tests.
|
||||
#[async_trait]
|
||||
pub trait SwarmExt {
|
||||
/// Establishes a connection to the given [`Swarm`], polling both of them
|
||||
/// until the connection is established.
|
||||
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
|
||||
where
|
||||
T: NetworkBehaviour,
|
||||
<T as NetworkBehaviour>::OutEvent: Debug;
|
||||
|
||||
/// Listens on a random memory address, polling the [`Swarm`] until the
|
||||
/// transport is ready to accept connections.
|
||||
async fn listen_on_random_memory_address(&mut self) -> Multiaddr;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<B> SwarmExt for Swarm<B>
|
||||
where
|
||||
BA: NetworkBehaviour,
|
||||
BB: NetworkBehaviour,
|
||||
<BA as NetworkBehaviour>::OutEvent: Debug,
|
||||
<BB as NetworkBehaviour>::OutEvent: Debug,
|
||||
B: NetworkBehaviour,
|
||||
<B as NetworkBehaviour>::OutEvent: Debug,
|
||||
{
|
||||
let mut alice_connected = false;
|
||||
let mut bob_connected = false;
|
||||
async fn block_on_connection<T>(&mut self, other: &mut Swarm<T>)
|
||||
where
|
||||
T: NetworkBehaviour,
|
||||
<T as NetworkBehaviour>::OutEvent: Debug,
|
||||
{
|
||||
let addr_to_dial = other.external_addresses().next().unwrap().addr.clone();
|
||||
|
||||
while !alice_connected && !bob_connected {
|
||||
let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await;
|
||||
self.dial_addr(addr_to_dial.clone()).unwrap();
|
||||
|
||||
match alice_event {
|
||||
let mut dialer_done = false;
|
||||
let mut listener_done = false;
|
||||
|
||||
loop {
|
||||
let dialer_event_fut = self.select_next_some();
|
||||
|
||||
tokio::select! {
|
||||
dialer_event = dialer_event_fut => {
|
||||
match dialer_event {
|
||||
SwarmEvent::ConnectionEstablished { .. } => {
|
||||
alice_connected = true;
|
||||
dialer_done = true;
|
||||
}
|
||||
SwarmEvent::NewListenAddr(addr) => {
|
||||
bob.dial_addr(addr).unwrap();
|
||||
SwarmEvent::UnknownPeerUnreachableAddr { address, error } if address == addr_to_dial => {
|
||||
panic!("Failed to dial address {}: {}", addr_to_dial, error)
|
||||
}
|
||||
SwarmEvent::Behaviour(event) => {
|
||||
panic!(
|
||||
"alice unexpectedly emitted a behaviour event during connection: {:?}",
|
||||
event
|
||||
other => {
|
||||
tracing::debug!("Ignoring {:?}", other);
|
||||
}
|
||||
}
|
||||
},
|
||||
listener_event = other.select_next_some() => {
|
||||
match listener_event {
|
||||
SwarmEvent::ConnectionEstablished { .. } => {
|
||||
listener_done = true;
|
||||
}
|
||||
SwarmEvent::IncomingConnectionError { error, .. } => {
|
||||
panic!("Failure in incoming connection {}", error);
|
||||
}
|
||||
other => {
|
||||
tracing::debug!("Ignoring {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if dialer_done && listener_done {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_on_random_memory_address(&mut self) -> Multiaddr {
|
||||
let multiaddr = get_rand_memory_address();
|
||||
|
||||
self.listen_on(multiaddr.clone()).unwrap();
|
||||
|
||||
// block until we are actually listening
|
||||
loop {
|
||||
match self.select_next_some().await {
|
||||
SwarmEvent::NewListenAddr(addr) if addr == multiaddr => {
|
||||
break;
|
||||
}
|
||||
other => {
|
||||
tracing::debug!(
|
||||
"Ignoring {:?} while waiting for listening to succeed",
|
||||
other
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
match bob_event {
|
||||
SwarmEvent::ConnectionEstablished { .. } => {
|
||||
bob_connected = true;
|
||||
}
|
||||
SwarmEvent::Behaviour(event) => {
|
||||
panic!(
|
||||
"bob unexpectedly emitted a behaviour event during connection: {:?}",
|
||||
event
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Memory addresses are externally reachable because they all share the same
|
||||
// memory-space.
|
||||
self.add_external_address(multiaddr.clone(), AddressScore::Infinite);
|
||||
|
||||
multiaddr
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user