diff --git a/src/main/java/io/bitsquare/SeedNode.java b/src/main/java/io/bitsquare/SeedNode.java index 42dfc37680..b2972abc98 100644 --- a/src/main/java/io/bitsquare/SeedNode.java +++ b/src/main/java/io/bitsquare/SeedNode.java @@ -22,6 +22,7 @@ import io.bitsquare.msg.SeedNodeAddress; import java.io.IOException; import net.tomp2p.dht.PeerBuilderDHT; +import net.tomp2p.dht.PeerDHT; import net.tomp2p.nat.PeerBuilderNAT; import net.tomp2p.p2p.Peer; import net.tomp2p.p2p.PeerBuilder; @@ -43,8 +44,14 @@ public class SeedNode extends Thread { try { peer = new PeerBuilder(Number160.createHash(seedNodeAddress.getId())).ports(seedNodeAddress.getPort()) .start(); - new PeerBuilderDHT(peer).start(); + PeerDHT peerDHT = new PeerBuilderDHT(peer).start(); new PeerBuilderNAT(peer).start(); + + /* peerDHT.peer().objectDataReply((sender, request) -> { + log.trace("received request: ", request.toString()); + return "pong"; + });*/ + log.debug("peer listening at port: {}", seedNodeAddress.getPort()); peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() { diff --git a/src/test/java/io/bitsquare/msg/BasicUsecasesInWANTest.java b/src/test/java/io/bitsquare/msg/BasicUsecasesInWANTest.java index 4cb44775f4..ca54d106d9 100644 --- a/src/test/java/io/bitsquare/msg/BasicUsecasesInWANTest.java +++ b/src/test/java/io/bitsquare/msg/BasicUsecasesInWANTest.java @@ -19,6 +19,8 @@ package io.bitsquare.msg; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,15 +55,12 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.*; /** - * Tests bootstrapping, put/get and sendDirect in WAN environment (auto port forwarding, Relay) + * Test bootstrapping, put/get/add/remove and sendDirect in WAN environment (auto port forwarding, Relay) * startBootstrappingSeedNode is used as the server side code */ public class BasicUsecasesInWANTest { private static final Logger log = LoggerFactory.getLogger(BasicUsecasesInWANTest.class); - // update to external ip (whats my ip) - private final static String CLIENT_IP = "83.36.8.117"; - private final static String SERVER_ID_1 = "digitalocean1.bitsquare.io"; // Manfreds server private final static String SERVER_IP_1 = "188.226.179.109"; // Manfreds server private final static int SERVER_PORT_1 = 5000; @@ -79,113 +78,187 @@ public class BasicUsecasesInWANTest { private final static String CLIENT_1_ID = "alice"; private final static String CLIENT_2_ID = "bob"; + private final static int CLIENT_1_PORT = new Ports().tcpPort(); + private final static int CLIENT_2_PORT = new Ports().tcpPort(); + + private String overrideBootStrapMode = "default"; // nat, relay + + // In port forwarding mode the isSuccess returns false, but the DHT operations succeeded. + // Needs investigation why. + private boolean ignoreSuccessTests = true; + + // Don't create and bootstrap the nodes at every test but reuse already created ones. + private boolean cacheClients = true; + + private final static Map clients = new HashMap<>(); - // with port forwarding the success calls are failing and sendDirect is not working + /////////////////////////////////////////////////////////////////////////////////////////// + // Seed node + /////////////////////////////////////////////////////////////////////////////////////////// + public static void main(String[] args) throws Exception { + new BasicUsecasesInWANTest().startBootstrappingSeedNode(); + } + + public void startBootstrappingSeedNode() { + Peer peer = null; + try { + peer = new PeerBuilder(Number160.createHash(SERVER_ID_1)).ports(SERVER_PORT_1).start(); + PeerDHT peerDHT = new PeerBuilderDHT(peer).start(); + peerDHT.peer().objectDataReply((sender, request) -> { + log.trace("received request: ", request.toString()); + return "pong"; + }); + + new PeerBuilderNAT(peer).start(); + + log.debug("peer started."); + for (; ; ) { + for (PeerAddress pa : peer.peerBean().peerMap().all()) { + log.debug("peer online (TCP):" + pa); + } + Thread.sleep(2000); + } + } catch (Exception e) { + if (peer != null) + peer.shutdown().awaitUninterruptibly(); + e.printStackTrace(); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Tests + /////////////////////////////////////////////////////////////////////////////////////////// @Test @Ignore public void testPutGet() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort()); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort()); + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); + PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); futurePut.awaitUninterruptibly(); - //assertTrue(futurePut.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut.isSuccess()); FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start(); futureGet.awaitUninterruptibly(); - //assertTrue(futureGet.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); assertEquals("hallo", futureGet.data().object()); - //assertTrue(futurePut.isSuccess()); - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); + if (!ignoreSuccessTests) + assertTrue(futurePut.isSuccess()); + + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + peer2DHT.shutdown().awaitUninterruptibly(); + } } @Test @Ignore public void testAddGet() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort()); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort()); + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); + PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); - //assertTrue(futurePut1.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut1.isSuccess()); FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); futurePut2.awaitUninterruptibly(); - //assertTrue(futurePut2.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut2.isSuccess()); FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); futureGet.awaitUninterruptibly(); - //assertTrue(futureGet.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); assertTrue(futureGet.dataMap().values().contains(new Data("hallo1"))); assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); assertTrue(futureGet.dataMap().values().size() == 2); - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + peer2DHT.shutdown().awaitUninterruptibly(); + } } @Test @Ignore - public void testRemove() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort()); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort()); + public void testAddRemove() throws Exception { + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); + PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); FuturePut futurePut1 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo1")).start(); futurePut1.awaitUninterruptibly(); - //assertTrue(futurePut1.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut1.isSuccess()); FuturePut futurePut2 = peer1DHT.add(Number160.createHash("locationKey")).data(new Data("hallo2")).start(); futurePut2.awaitUninterruptibly(); - // assertTrue(futurePut2.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut2.isSuccess()); Number160 contentKey = new Data("hallo1").hash(); FutureRemove futureRemove = peer2DHT.remove(Number160.createHash("locationKey")).contentKey(contentKey).start(); futureRemove.awaitUninterruptibly(); - // why is futureRemove.isSuccess() = false ? - log.debug(futureRemove.failedReason());// Future (compl/canc):true/false, OK, Minimun number of results reached - // assertTrue(futureRemove.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futureRemove.isSuccess()); FutureGet futureGet = peer2DHT.get(Number160.createHash("locationKey")).all().start(); futureGet.awaitUninterruptibly(); - //assertTrue(futureGet.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); assertTrue(futureGet.dataMap().values().contains(new Data("hallo2"))); assertTrue(futureGet.dataMap().values().size() == 1); - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + peer2DHT.shutdown().awaitUninterruptibly(); + } } @Test @Ignore public void testDHT2Servers() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort(), SERVER_ID_1, SERVER_IP_1, SERVER_PORT_1); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort(), SERVER_ID_2, SERVER_IP_2, SERVER_PORT_2); + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT, SERVER_ID_1, SERVER_IP_1, SERVER_PORT_1); + PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT, SERVER_ID_2, SERVER_IP_2, SERVER_PORT_2); FuturePut futurePut = peer1DHT.put(Number160.createHash("key")).data(new Data("hallo")).start(); futurePut.awaitUninterruptibly(); - //assertTrue(futurePut.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futurePut.isSuccess()); FutureGet futureGet = peer2DHT.get(Number160.createHash("key")).start(); futureGet.awaitUninterruptibly(); - //assertTrue(futureGet.isSuccess()); + if (!ignoreSuccessTests) + assertTrue(futureGet.isSuccess()); assertEquals("hallo", futureGet.data().object()); - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + peer2DHT.shutdown().awaitUninterruptibly(); + } } + // That test fails in port forwarding mode because most routers does not support NAT reflections. + // So if both clients are behind NAT they cannot send direct message to the other. + // That will probably be fixed in a future version of TomP2P + // In relay mode the test should succeed @Test @Ignore - public void testSendDirect() throws Exception { - PeerDHT peer1DHT = startClient(CLIENT_1_ID, new Ports().tcpPort()); - PeerDHT peer2DHT = startClient(CLIENT_2_ID, new Ports().tcpPort()); + public void testSendDirectRelay() throws Exception { + overrideBootStrapMode = "relay"; + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); + overrideBootStrapMode = "nat"; + PeerDHT peer2DHT = startClient(CLIENT_2_ID, CLIENT_2_PORT); final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -205,106 +278,121 @@ public class BasicUsecasesInWANTest { if (countDownLatch.getCount() > 0) Assert.fail("The test method did not complete successfully!"); - // assertTrue(futureDirect.isSuccess()); - // assertEquals("world", futureDirect.object()); assertEquals("hallo", result.toString()); + assertTrue(futureDirect.isSuccess()); + //assertEquals("pong", futureDirect.object()); - peer1DHT.shutdown().awaitUninterruptibly(); - peer2DHT.shutdown().awaitUninterruptibly(); + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + peer2DHT.shutdown().awaitUninterruptibly(); + } } + // That test should succeed in port forwarding as we use the server seed node as receiver + @Test + @Ignore + public void testSendDirectPortForwarding() throws Exception { + PeerDHT peer1DHT = startClient(CLIENT_1_ID, CLIENT_1_PORT); + PeerAddress reachablePeerAddress = new PeerAddress(Number160.createHash(SERVER_ID), SERVER_IP, SERVER_PORT, + SERVER_PORT); + + FuturePeerConnection futurePeerConnection = peer1DHT.peer().createPeerConnection(reachablePeerAddress, 500); + FutureDirect futureDirect = peer1DHT.peer().sendDirect(futurePeerConnection).object("hallo").start(); + futureDirect.awaitUninterruptibly(); + assertTrue(futureDirect.isSuccess()); + //assertEquals("pong", futureDirect.object()); + + if (!cacheClients) { + peer1DHT.shutdown().awaitUninterruptibly(); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Utils + /////////////////////////////////////////////////////////////////////////////////////////// + + private PeerDHT startClient(String clientId, int clientPort) throws Exception { return startClient(clientId, clientPort, SERVER_ID, SERVER_IP, SERVER_PORT); } private PeerDHT startClient(String clientId, int clientPort, String serverId, String serverIP, int serverPort) throws Exception { - Peer peer = null; - try { - peer = new PeerBuilder(Number160.createHash(clientId)).ports(clientPort).behindFirewall().start(); - PeerDHT peerDHT = new PeerBuilderDHT(peer).storageLayer(new StorageLayer(new StorageMemory())).start(); + final String id = clientId + clientPort; + log.debug("id = " + id + "/" + clients.containsKey(id)); + if (cacheClients && clients.containsKey(id)) { + return clients.get(id); + } + else { + Peer peer = null; + try { + peer = new PeerBuilder(Number160.createHash(clientId)).ports(clientPort).behindFirewall().start(); + PeerDHT peerDHT = new PeerBuilderDHT(peer).storageLayer(new StorageLayer(new StorageMemory())).start(); - PeerAddress masterNodeAddress = new PeerAddress(Number160.createHash(serverId), serverIP, serverPort, - serverPort); - FutureDiscover futureDiscover = peer.discover().peerAddress(masterNodeAddress).start(); - futureDiscover.awaitUninterruptibly(); - if (futureDiscover.isSuccess()) { - log.info("Discover with direct connection successful. Address = " + futureDiscover.peerAddress()); - return peerDHT; - } - else { - PeerNAT peerNAT = new PeerBuilderNAT(peer).start(); - FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover); - futureNAT.awaitUninterruptibly(); - if (futureNAT.isSuccess()) { - log.info("Automatic port forwarding is setup. Address = " + - futureNAT.peerAddress()); - FutureDiscover futureDiscover2 = peer.discover().peerAddress(masterNodeAddress).start(); - futureDiscover2.awaitUninterruptibly(); - if (futureDiscover2.isSuccess()) { - log.info("Discover with automatic port forwarding successful. Address = " + futureDiscover2 - .peerAddress()); - - log.info("Automatic port forwarding is setup. Address = " + - futureNAT.peerAddress()); - return peerDHT; - } - else { - log.error("Bootstrap with NAT after futureDiscover2 failed " + futureDiscover2.failedReason()); - peer.shutdown().awaitUninterruptibly(); - return null; - } + PeerAddress masterNodeAddress = new PeerAddress(Number160.createHash(serverId), serverIP, serverPort, + serverPort); + FutureDiscover futureDiscover = peer.discover().peerAddress(masterNodeAddress).start(); + futureDiscover.awaitUninterruptibly(); + if (futureDiscover.isSuccess() && overrideBootStrapMode.equals("default")) { + log.info("Discover with direct connection successful. Address = " + futureDiscover.peerAddress()); + clients.put(id, peerDHT); + return peerDHT; } else { - FutureRelayNAT futureRelayNAT = peerNAT.startRelay(futureDiscover, futureNAT); - futureRelayNAT.awaitUninterruptibly(); - if (futureRelayNAT.isSuccess()) { - log.info("Bootstrap using relay successful. Address = " + peer.peerAddress()); - return peerDHT; + PeerNAT peerNAT = new PeerBuilderNAT(peer).start(); + FutureDiscover futureDiscover2 = peer.discover().peerAddress(masterNodeAddress).start(); + FutureNAT futureNAT = peerNAT.startSetupPortforwarding(futureDiscover2); + futureNAT.awaitUninterruptibly(); + if (futureNAT.isSuccess() && !overrideBootStrapMode.equals("relay")) { + log.info("Automatic port forwarding is setup. Address = " + + futureNAT.peerAddress()); + FutureDiscover futureDiscover3 = peer.discover().peerAddress(masterNodeAddress).start(); + futureDiscover3.awaitUninterruptibly(); + if (futureDiscover3.isSuccess()) { + log.info("Discover with automatic port forwarding successful. Address = " + futureDiscover3 + .peerAddress()); + clients.put(id, peerDHT); + return peerDHT; + } + else { + log.error("Bootstrap with NAT after futureDiscover2 failed " + futureDiscover3 + .failedReason()); + peer.shutdown().awaitUninterruptibly(); + return null; + } } else { - log.error("Bootstrap using relay failed " + futureRelayNAT.failedReason()); - Assert.fail("Bootstrap using relay failed " + futureRelayNAT.failedReason()); - futureRelayNAT.shutdown(); - peer.shutdown().awaitUninterruptibly(); - return null; + log.debug("futureNAT.failedReason() = " + futureNAT.failedReason()); + FutureDiscover futureDiscover4 = peer.discover().peerAddress(masterNodeAddress).start(); + FutureRelayNAT futureRelayNAT = peerNAT.startRelay(futureDiscover4, futureNAT); + futureRelayNAT.awaitUninterruptibly(); + if (futureRelayNAT.isSuccess()) { + log.info("Bootstrap using relay successful. Address = " + peer.peerAddress()); + clients.put(id, peerDHT); + return peerDHT; + + } + else { + log.error("Bootstrap using relay failed " + futureRelayNAT.failedReason()); + Assert.fail("Bootstrap using relay failed " + futureRelayNAT.failedReason()); + futureRelayNAT.shutdown(); + peer.shutdown().awaitUninterruptibly(); + return null; + } } } + } catch (IOException e) { + log.error("Bootstrap in relay mode failed " + e.getMessage()); + e.printStackTrace(); + Assert.fail("Bootstrap in relay mode failed " + e.getMessage()); + if (peer != null) + peer.shutdown().awaitUninterruptibly(); + return null; } - } catch (IOException e) { - log.error("Bootstrap in relay mode failed " + e.getMessage()); - e.printStackTrace(); - Assert.fail("Bootstrap in relay mode failed " + e.getMessage()); - if (peer != null) - peer.shutdown().awaitUninterruptibly(); - return null; } } - public static void main(String[] args) throws Exception { - new BasicUsecasesInWANTest().startBootstrappingSeedNode(); - } - - public void startBootstrappingSeedNode() { - Peer peer = null; - try { - peer = new PeerBuilder(Number160.createHash("digitalocean1.bitsquare.io")).ports(5000).start(); - new PeerBuilderDHT(peer).start(); - new PeerBuilderNAT(peer).start(); - - System.out.println("peer started."); - for (; ; ) { - for (PeerAddress pa : peer.peerBean().peerMap().all()) { - System.out.println("peer online (TCP):" + pa); - } - Thread.sleep(2000); - } - } catch (Exception e) { - if (peer != null) - peer.shutdown().awaitUninterruptibly(); - e.printStackTrace(); - } - } }