From 2ec1fe1c59924b45cc31213cd0708bc09985d35f Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 10 Nov 2014 01:35:08 +0100 Subject: [PATCH] Sync with Thomas changes, add bootstrap call --- .../java/io/bitsquare/app/cli/SeedNode.java | 3 +- .../io/bitsquare/network/BootstrapNodes.java | 1 + .../java/io/bitsquare/msg/TomP2PTests.java | 106 ++++++++++++------ 3 files changed, 72 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/bitsquare/app/cli/SeedNode.java b/src/main/java/io/bitsquare/app/cli/SeedNode.java index 38a5d4ac09..441fe8d921 100644 --- a/src/main/java/io/bitsquare/app/cli/SeedNode.java +++ b/src/main/java/io/bitsquare/app/cli/SeedNode.java @@ -77,7 +77,6 @@ public class SeedNode { log.debug("SeedNode started."); new Thread(new Runnable() { - @Override public void run() { while (running) { @@ -87,7 +86,7 @@ public class SeedNode { try { Thread.sleep(2000); } catch (InterruptedException e) { - e.printStackTrace(); + return; } } } diff --git a/src/main/java/io/bitsquare/network/BootstrapNodes.java b/src/main/java/io/bitsquare/network/BootstrapNodes.java index 85f103136b..cc12bf90d9 100644 --- a/src/main/java/io/bitsquare/network/BootstrapNodes.java +++ b/src/main/java/io/bitsquare/network/BootstrapNodes.java @@ -27,6 +27,7 @@ import java.util.List; public interface BootstrapNodes { Node LOCALHOST = Node.at("localhost", "127.0.0.1"); Node DIGITAL_OCEAN_1 = Node.at("digitalocean1.bitsquare.io", "188.226.179.109"); + Node DIGITAL_OCEAN_1_DEV = Node.at("digitalocean1.bitsquare.io", "188.226.179.109", 7367); Node DEFAULT_BOOTSTRAP_NODE = DIGITAL_OCEAN_1; diff --git a/src/test/java/io/bitsquare/msg/TomP2PTests.java b/src/test/java/io/bitsquare/msg/TomP2PTests.java index aaaf23a3f5..f7a70a6298 100644 --- a/src/test/java/io/bitsquare/msg/TomP2PTests.java +++ b/src/test/java/io/bitsquare/msg/TomP2PTests.java @@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import net.tomp2p.connection.Bindings; +import net.tomp2p.connection.ChannelClientConfiguration; import net.tomp2p.connection.Ports; import net.tomp2p.connection.StandardProtocolFamily; import net.tomp2p.dht.FutureGet; @@ -49,6 +50,8 @@ import net.tomp2p.p2p.Peer; import net.tomp2p.p2p.PeerBuilder; import net.tomp2p.peers.Number160; import net.tomp2p.peers.PeerAddress; +import net.tomp2p.peers.PeerMap; +import net.tomp2p.peers.PeerMapConfiguration; import net.tomp2p.storage.Data; import org.junit.After; @@ -87,7 +90,7 @@ public class TomP2PTests { // If you have a setup where you are not behind a router you can also use a WAN side seed node. private static final Node BOOTSTRAP_NODE = (FORCED_CONNECTION_TYPE == ConnectionType.DIRECT) ? BootstrapNodes.LOCALHOST : BootstrapNodes - .DIGITAL_OCEAN_1; + .DIGITAL_OCEAN_1_DEV; private static final PeerAddress BOOTSTRAP_NODE_ADDRESS; @@ -119,10 +122,12 @@ public class TomP2PTests { @After public void tearDown() { - if (peer1DHT != null) - peer1DHT.shutdown().awaitUninterruptibly(); - if (peer2DHT != null) - peer2DHT.shutdown().awaitUninterruptibly(); + if (peer1DHT != null) { + peer1DHT.shutdown().awaitUninterruptibly(); + } + if (peer2DHT != null) { + peer2DHT.shutdown().awaitUninterruptibly(); + } } @Test @@ -215,21 +220,20 @@ public class TomP2PTests { @Repeat(STRESS_TEST_COUNT) public void testAddRemove() throws Exception { peer1DHT = getDHTPeer("node_1", client1Port); - FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")) - .start(); + FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); + futurePut1.awaitListenersUninterruptibly(); assertTrue(futurePut1.isSuccess()); - FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")) - .start(); + FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); futurePut2.awaitUninterruptibly(); + futurePut2.awaitListenersUninterruptibly(); assertTrue(futurePut2.isSuccess()); peer2DHT = getDHTPeer("node_2", client2Port); Number160 contentKey = new Data("hallo1").hash(); - FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey) - .start(); + FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey).start(); futureRemove.awaitUninterruptibly(); futureRemove.awaitListenersUninterruptibly(); @@ -237,9 +241,14 @@ public class TomP2PTests { // it might change in future to something like foundAndRemoved and notFound // See discussion at: https://github.com/tomp2p/TomP2P/issues/57#issuecomment-62069840 + assertTrue(futureRemove.isSuccess()); + FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); futureGet.awaitUninterruptibly(); assertTrue(futureGet.isSuccess()); + if (!futureGet.dataMap().values().contains(new Data("hallo2"))) { + log.error("raw data has the value, the evaluated not!"); + } assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); assertTrue(futureGet.dataMap().values().size() == 1); @@ -252,31 +261,33 @@ public class TomP2PTests { @Test @Repeat(STRESS_TEST_COUNT) public void testSendDirectBetweenLocalPeers() throws Exception { - peer1DHT = getDHTPeer("node_1", client1Port); - peer2DHT = getDHTPeer("node_2", client2Port); + if (FORCED_CONNECTION_TYPE != ConnectionType.NAT && resolvedConnectionType != ConnectionType.RELAY) { + peer1DHT = getDHTPeer("node_1", client1Port); + peer2DHT = getDHTPeer("node_2", client2Port); - final CountDownLatch countDownLatch = new CountDownLatch(1); + final CountDownLatch countDownLatch = new CountDownLatch(1); - final StringBuilder result = new StringBuilder(); - peer2DHT.peer().objectDataReply((sender, request) -> { - countDownLatch.countDown(); - result.append(String.valueOf(request)); - return "pong"; - }); - FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer() - .peerAddress(), 500); - FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); - futureDirect.awaitUninterruptibly(); + final StringBuilder result = new StringBuilder(); + peer2DHT.peer().objectDataReply((sender, request) -> { + countDownLatch.countDown(); + result.append(String.valueOf(request)); + return "pong"; + }); + FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(peer2DHT.peer() + .peerAddress(), 500); + FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); + futureDirect.awaitUninterruptibly(); - countDownLatch.await(3, TimeUnit.SECONDS); - if (countDownLatch.getCount() > 0) - Assert.fail("The test method did not complete successfully!"); + countDownLatch.await(3, TimeUnit.SECONDS); + if (countDownLatch.getCount() > 0) + Assert.fail("The test method did not complete successfully!"); - assertEquals("hallo", result.toString()); - assertTrue(futureDirect.isSuccess()); - log.debug(futureDirect.object().toString()); - assertEquals("pong", futureDirect.object()); + assertEquals("hallo", result.toString()); + assertTrue(futureDirect.isSuccess()); + log.debug(futureDirect.object().toString()); + assertEquals("pong", futureDirect.object()); + } } // That test should always succeed as we use the server seed node as receiver. @@ -297,8 +308,13 @@ public class TomP2PTests { final String id = clientId + clientPort; Peer peer = null; try { - peer = new PeerBuilder(Number160.createHash(clientId)).bindings(getBindings()) - .ports(clientPort).start(); + Number160 peerId = Number160.createHash(clientId); + PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification(); + PeerMap pm = new PeerMap(pmc); + ChannelClientConfiguration cc = PeerBuilder.createDefaultChannelClientConfiguration(); + cc.maxPermitsTCP(100); + cc.maxPermitsUDP(100); + peer = new PeerBuilder(peerId).bindings(getBindings()).peerMap(pm).ports(clientPort).start(); FutureDiscover futureDiscover = peer.discover().peerAddress(BOOTSTRAP_NODE_ADDRESS).start(); futureDiscover.awaitUninterruptibly(); if (futureDiscover.isSuccess()) { @@ -341,7 +357,6 @@ public class TomP2PTests { FutureDiscover futureDiscover = peer.discover().peerAddress(BOOTSTRAP_NODE_ADDRESS).start(); FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); futureNAT.awaitUninterruptibly(); - futureNAT.awaitListenersUninterruptibly(); if (futureNAT.isSuccess()) { log.info("Automatic port forwarding is setup. Now we do a futureDiscover again. Address = " + futureNAT.peerAddress()); @@ -350,7 +365,17 @@ public class TomP2PTests { if (futureDiscover.isSuccess()) { log.info("Discover with automatic port forwarding was successful. Address = " + futureDiscover .peerAddress()); - return peer; + + FutureBootstrap futureBootstrap = peer.bootstrap().peerAddress(BOOTSTRAP_NODE_ADDRESS).start(); + futureBootstrap.awaitUninterruptibly(); + if (futureBootstrap.isSuccess()) { + return peer; + } + else { + log.warn("Bootstrap failed. Reason = " + futureBootstrap.failedReason()); + peer.shutdown().awaitUninterruptibly(); + return null; + } } else { log.warn("Discover with automatic port forwarding failed. Reason = " + futureDiscover @@ -390,8 +415,17 @@ public class TomP2PTests { futureRelayNAT.awaitUninterruptibly(); if (futureRelayNAT.isSuccess()) { log.info("Bootstrap using relay was successful. Address = " + peer.peerAddress()); - return peer; + FutureBootstrap futureBootstrap = peer.bootstrap().peerAddress(BOOTSTRAP_NODE_ADDRESS).start(); + futureBootstrap.awaitUninterruptibly(); + if (futureBootstrap.isSuccess()) { + return peer; + } + else { + log.warn("Bootstrap failed. Reason = " + futureBootstrap.failedReason()); + peer.shutdown().awaitUninterruptibly(); + return null; + } } else { log.error("Bootstrap using relay failed " + futureRelayNAT.failedReason());