From d6f97c65c474b1d9c8ce0310a8b657d5c5537c05 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 14 Nov 2014 00:49:23 +0100 Subject: [PATCH] Add new test (testParallelStartupWithPutGet) --- .../io/bitsquare/app/cli/BootstrapNode.java | 8 +- .../msg/tomp2p/BootstrappedPeerFactory.java | 11 +- .../java/io/bitsquare/msg/TomP2PTests.java | 171 ++++++++++++++++-- 3 files changed, 159 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/bitsquare/app/cli/BootstrapNode.java b/src/main/java/io/bitsquare/app/cli/BootstrapNode.java index b9ba80a7c8..9b70afa559 100644 --- a/src/main/java/io/bitsquare/app/cli/BootstrapNode.java +++ b/src/main/java/io/bitsquare/app/cli/BootstrapNode.java @@ -26,8 +26,6 @@ import net.tomp2p.p2p.Peer; import net.tomp2p.p2p.PeerBuilder; import net.tomp2p.peers.Number160; import net.tomp2p.peers.PeerAddress; -import net.tomp2p.peers.PeerMap; -import net.tomp2p.peers.PeerMapConfiguration; import net.tomp2p.replication.IndirectReplication; import net.tomp2p.rpc.ObjectDataReply; @@ -54,9 +52,7 @@ public class BootstrapNode { try { Number160 peerId = Number160.createHash(name); - PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification(); - PeerMap pm = new PeerMap(pmc); - peer = new PeerBuilder(peerId).ports(port).peerMap(pm).start(); + peer = new PeerBuilder(peerId).ports(port).start(); peer.objectDataReply(new ObjectDataReply() { @Override public Object reply(PeerAddress sender, Object request) throws Exception { @@ -69,7 +65,7 @@ public class BootstrapNode { new PeerBuilderNAT(peer).start(); new IndirectReplication(peerDHT).start(); - log.debug("started"); + log.debug("Bootstrap node started with name " + name + " and port " + port); new Thread(new Runnable() { @Override public void run() { diff --git a/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java b/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java index 6469235728..4c46a8f217 100644 --- a/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java +++ b/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java @@ -39,7 +39,6 @@ import javafx.beans.property.ObjectProperty; import javafx.beans.property.SimpleObjectProperty; import net.tomp2p.connection.Bindings; -import net.tomp2p.connection.ChannelClientConfiguration; import net.tomp2p.dht.PeerBuilderDHT; import net.tomp2p.dht.PeerDHT; import net.tomp2p.futures.BaseFuture; @@ -54,9 +53,7 @@ import net.tomp2p.p2p.Peer; import net.tomp2p.p2p.PeerBuilder; import net.tomp2p.peers.Number160; import net.tomp2p.peers.PeerAddress; -import net.tomp2p.peers.PeerMap; import net.tomp2p.peers.PeerMapChangeListener; -import net.tomp2p.peers.PeerMapConfiguration; import net.tomp2p.peers.PeerStatistic; import net.tomp2p.replication.IndirectReplication; import net.tomp2p.utils.Utils; @@ -124,17 +121,11 @@ class BootstrappedPeerFactory { setState(BootstrapState.PEER_CREATION, "We create a P2P node."); Number160 peerId = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); - PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification(); - PeerMap pm = new PeerMap(pmc); - ChannelClientConfiguration cc = PeerBuilder.createDefaultChannelClientConfiguration(); - cc.maxPermitsTCP(100); - cc.maxPermitsUDP(100); Bindings bindings = new Bindings(); if (!NETWORK_INTERFACE_UNSPECIFIED.equals(networkInterface)) bindings.addInterface(networkInterface); - peer = new PeerBuilder(keyPair).ports(port).peerMap(pm).bindings(bindings) - .channelClientConfiguration(cc).start(); + peer = new PeerBuilder(keyPair).ports(port).bindings(bindings).start(); peerDHT = new PeerBuilderDHT(peer).start(); new IndirectReplication(peerDHT).start(); diff --git a/src/test/java/io/bitsquare/msg/TomP2PTests.java b/src/test/java/io/bitsquare/msg/TomP2PTests.java index 65dd01c123..64e0954345 100644 --- a/src/test/java/io/bitsquare/msg/TomP2PTests.java +++ b/src/test/java/io/bitsquare/msg/TomP2PTests.java @@ -31,6 +31,9 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javafx.beans.property.BooleanProperty; +import javafx.beans.property.SimpleBooleanProperty; + import net.tomp2p.connection.Bindings; import net.tomp2p.connection.ChannelClientConfiguration; import net.tomp2p.connection.StandardProtocolFamily; @@ -40,6 +43,7 @@ import net.tomp2p.dht.FutureRemove; import net.tomp2p.dht.PeerBuilderDHT; import net.tomp2p.dht.PeerDHT; import net.tomp2p.futures.BaseFuture; +import net.tomp2p.futures.BaseFutureListener; import net.tomp2p.futures.FutureBootstrap; import net.tomp2p.futures.FutureDirect; import net.tomp2p.futures.FutureDiscover; @@ -120,21 +124,14 @@ public class TomP2PTests { @After public void tearDown() { - if (peer1DHT != null) { - BaseFuture future = peer1DHT.shutdown(); - future.awaitUninterruptibly(); - future.awaitListenersUninterruptibly(); - } - if (peer2DHT != null) { - BaseFuture future = peer2DHT.shutdown(); - future.awaitUninterruptibly(); - future.awaitListenersUninterruptibly(); - } - if (peer != null) { - BaseFuture future = peer.shutdown(); - future.awaitUninterruptibly(); - future.awaitListenersUninterruptibly(); - } + if (peer1DHT != null) + peer1DHT.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + + if (peer2DHT != null) + peer2DHT.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + + if (peer != null) + peer.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); } @Test @@ -296,6 +293,150 @@ public class TomP2PTests { assertTrue(futureGet.dataMap().values().size() == 2); } + @Test + @Repeat(STRESS_TEST_COUNT) + public void testParallelStartupWithPutGet() throws IOException, ClassNotFoundException, InterruptedException { + PeerDHT peer1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer1")).ports(3006).start()).start(); + PeerDHT peer2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer2")).ports(3007).start()).start(); + + PeerAddress masterPeerAddress = new PeerAddress(Number160.createHash(BootstrapNodes.LOCALHOST.getName()), + BootstrapNodes.LOCALHOST.getIp(), BootstrapNodes.LOCALHOST.getPort(), + BootstrapNodes.LOCALHOST.getPort()); + + // start both at the same time + BaseFuture fb1 = peer1.peer().bootstrap().peerAddress(masterPeerAddress).start(); + BaseFuture fb2 = peer2.peer().bootstrap().peerAddress(masterPeerAddress).start(); + + final BooleanProperty peer1Done = new SimpleBooleanProperty(); + final BooleanProperty peer2Done = new SimpleBooleanProperty(); + + fb1.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + peer1Done.set(true); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + + fb2.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + peer2Done.set(true); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + + while (!peer1Done.get() && !peer2Done.get()) + Thread.sleep(100); + + // both are started up + Assert.assertTrue(fb1.isSuccess()); + Assert.assertTrue(fb2.isSuccess()); + + // peer1 put data + FuturePut fp = peer1.put(Number160.ONE).object("test").start().awaitUninterruptibly(); + Assert.assertTrue(fp.isSuccess()); + + // both get data + FutureGet fg1 = peer1.get(Number160.ONE).start().awaitUninterruptibly(); + Assert.assertTrue(fg1.isSuccess()); + Assert.assertEquals("test", fg1.data().object()); + FutureGet fg2 = peer2.get(Number160.ONE).start().awaitUninterruptibly(); + Assert.assertTrue(fg2.isSuccess()); + Assert.assertEquals("test", fg2.data().object()); + + // shutdown both + peer1.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + peer2.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + + // start both again at the same time + peer1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer1")).ports(3005).start()).start(); + peer2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer2")).ports(3006).start()).start(); + + fb1 = peer1.peer().bootstrap().peerAddress(masterPeerAddress).start(); + fb2 = peer2.peer().bootstrap().peerAddress(masterPeerAddress).start(); + + peer1Done.set(false); + peer2Done.set(false); + + final PeerDHT _peer1 = peer1; + fb1.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + peer1Done.set(true); + + // when peer1 is ready it gets the data + FutureGet fg = _peer1.get(Number160.ONE).start(); + fg.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + Assert.assertTrue(fg.isSuccess()); + Assert.assertEquals("test", fg.data().object()); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + + final PeerDHT _peer2 = peer2; + fb2.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + peer2Done.set(true); + + // when peer2 is ready it gets the data + FutureGet fg = _peer2.get(Number160.ONE).start(); + fg.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + Assert.assertTrue(fg.isSuccess()); + Assert.assertEquals("test", fg.data().object()); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + } + }); + + while (!peer1Done.get() && !peer2Done.get()) + Thread.sleep(100); + + // both are started up + Assert.assertTrue(fb1.isSuccess()); + Assert.assertTrue(fb2.isSuccess()); + + // get data again for both + fg1 = peer1.get(Number160.ONE).start().awaitUninterruptibly(); + Assert.assertTrue(fg1.isSuccess()); + Assert.assertEquals("test", fg1.data().object()); + + fg2 = peer2.get(Number160.ONE).start().awaitUninterruptibly(); + Assert.assertTrue(fg2.isSuccess()); + Assert.assertEquals("test", fg2.data().object()); + + peer1.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + peer2.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly(); + } + @Test @Repeat(STRESS_TEST_COUNT) public void testAddRemove() throws Exception {