Add new test (testParallelStartupWithPutGet)

This commit is contained in:
Manfred Karrer 2014-11-14 00:49:23 +01:00
parent aedc58600a
commit d6f97c65c4
3 changed files with 159 additions and 31 deletions

View File

@ -26,8 +26,6 @@ import net.tomp2p.p2p.Peer;
import net.tomp2p.p2p.PeerBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMap;
import net.tomp2p.peers.PeerMapConfiguration;
import net.tomp2p.replication.IndirectReplication;
import net.tomp2p.rpc.ObjectDataReply;
@ -54,9 +52,7 @@ public class BootstrapNode {
try {
Number160 peerId = Number160.createHash(name);
PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification();
PeerMap pm = new PeerMap(pmc);
peer = new PeerBuilder(peerId).ports(port).peerMap(pm).start();
peer = new PeerBuilder(peerId).ports(port).start();
peer.objectDataReply(new ObjectDataReply() {
@Override
public Object reply(PeerAddress sender, Object request) throws Exception {
@ -69,7 +65,7 @@ public class BootstrapNode {
new PeerBuilderNAT(peer).start();
new IndirectReplication(peerDHT).start();
log.debug("started");
log.debug("Bootstrap node started with name " + name + " and port " + port);
new Thread(new Runnable() {
@Override
public void run() {

View File

@ -39,7 +39,6 @@ import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import net.tomp2p.connection.Bindings;
import net.tomp2p.connection.ChannelClientConfiguration;
import net.tomp2p.dht.PeerBuilderDHT;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.futures.BaseFuture;
@ -54,9 +53,7 @@ import net.tomp2p.p2p.Peer;
import net.tomp2p.p2p.PeerBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.PeerAddress;
import net.tomp2p.peers.PeerMap;
import net.tomp2p.peers.PeerMapChangeListener;
import net.tomp2p.peers.PeerMapConfiguration;
import net.tomp2p.peers.PeerStatistic;
import net.tomp2p.replication.IndirectReplication;
import net.tomp2p.utils.Utils;
@ -124,17 +121,11 @@ class BootstrappedPeerFactory {
setState(BootstrapState.PEER_CREATION, "We create a P2P node.");
Number160 peerId = Utils.makeSHAHash(keyPair.getPublic().getEncoded());
PeerMapConfiguration pmc = new PeerMapConfiguration(peerId).peerNoVerification();
PeerMap pm = new PeerMap(pmc);
ChannelClientConfiguration cc = PeerBuilder.createDefaultChannelClientConfiguration();
cc.maxPermitsTCP(100);
cc.maxPermitsUDP(100);
Bindings bindings = new Bindings();
if (!NETWORK_INTERFACE_UNSPECIFIED.equals(networkInterface))
bindings.addInterface(networkInterface);
peer = new PeerBuilder(keyPair).ports(port).peerMap(pm).bindings(bindings)
.channelClientConfiguration(cc).start();
peer = new PeerBuilder(keyPair).ports(port).bindings(bindings).start();
peerDHT = new PeerBuilderDHT(peer).start();
new IndirectReplication(peerDHT).start();

View File

@ -31,6 +31,9 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import net.tomp2p.connection.Bindings;
import net.tomp2p.connection.ChannelClientConfiguration;
import net.tomp2p.connection.StandardProtocolFamily;
@ -40,6 +43,7 @@ import net.tomp2p.dht.FutureRemove;
import net.tomp2p.dht.PeerBuilderDHT;
import net.tomp2p.dht.PeerDHT;
import net.tomp2p.futures.BaseFuture;
import net.tomp2p.futures.BaseFutureListener;
import net.tomp2p.futures.FutureBootstrap;
import net.tomp2p.futures.FutureDirect;
import net.tomp2p.futures.FutureDiscover;
@ -120,21 +124,14 @@ public class TomP2PTests {
@After
public void tearDown() {
if (peer1DHT != null) {
BaseFuture future = peer1DHT.shutdown();
future.awaitUninterruptibly();
future.awaitListenersUninterruptibly();
}
if (peer2DHT != null) {
BaseFuture future = peer2DHT.shutdown();
future.awaitUninterruptibly();
future.awaitListenersUninterruptibly();
}
if (peer != null) {
BaseFuture future = peer.shutdown();
future.awaitUninterruptibly();
future.awaitListenersUninterruptibly();
}
if (peer1DHT != null)
peer1DHT.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
if (peer2DHT != null)
peer2DHT.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
if (peer != null)
peer.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
}
@Test
@ -296,6 +293,150 @@ public class TomP2PTests {
assertTrue(futureGet.dataMap().values().size() == 2);
}
@Test
@Repeat(STRESS_TEST_COUNT)
public void testParallelStartupWithPutGet() throws IOException, ClassNotFoundException, InterruptedException {
PeerDHT peer1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer1")).ports(3006).start()).start();
PeerDHT peer2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer2")).ports(3007).start()).start();
PeerAddress masterPeerAddress = new PeerAddress(Number160.createHash(BootstrapNodes.LOCALHOST.getName()),
BootstrapNodes.LOCALHOST.getIp(), BootstrapNodes.LOCALHOST.getPort(),
BootstrapNodes.LOCALHOST.getPort());
// start both at the same time
BaseFuture fb1 = peer1.peer().bootstrap().peerAddress(masterPeerAddress).start();
BaseFuture fb2 = peer2.peer().bootstrap().peerAddress(masterPeerAddress).start();
final BooleanProperty peer1Done = new SimpleBooleanProperty();
final BooleanProperty peer2Done = new SimpleBooleanProperty();
fb1.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
peer1Done.set(true);
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
fb2.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
peer2Done.set(true);
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
while (!peer1Done.get() && !peer2Done.get())
Thread.sleep(100);
// both are started up
Assert.assertTrue(fb1.isSuccess());
Assert.assertTrue(fb2.isSuccess());
// peer1 put data
FuturePut fp = peer1.put(Number160.ONE).object("test").start().awaitUninterruptibly();
Assert.assertTrue(fp.isSuccess());
// both get data
FutureGet fg1 = peer1.get(Number160.ONE).start().awaitUninterruptibly();
Assert.assertTrue(fg1.isSuccess());
Assert.assertEquals("test", fg1.data().object());
FutureGet fg2 = peer2.get(Number160.ONE).start().awaitUninterruptibly();
Assert.assertTrue(fg2.isSuccess());
Assert.assertEquals("test", fg2.data().object());
// shutdown both
peer1.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
peer2.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
// start both again at the same time
peer1 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer1")).ports(3005).start()).start();
peer2 = new PeerBuilderDHT(new PeerBuilder(Number160.createHash("peer2")).ports(3006).start()).start();
fb1 = peer1.peer().bootstrap().peerAddress(masterPeerAddress).start();
fb2 = peer2.peer().bootstrap().peerAddress(masterPeerAddress).start();
peer1Done.set(false);
peer2Done.set(false);
final PeerDHT _peer1 = peer1;
fb1.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
peer1Done.set(true);
// when peer1 is ready it gets the data
FutureGet fg = _peer1.get(Number160.ONE).start();
fg.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
Assert.assertTrue(fg.isSuccess());
Assert.assertEquals("test", fg.data().object());
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
final PeerDHT _peer2 = peer2;
fb2.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
peer2Done.set(true);
// when peer2 is ready it gets the data
FutureGet fg = _peer2.get(Number160.ONE).start();
fg.addListener(new BaseFutureListener<BaseFuture>() {
@Override
public void operationComplete(BaseFuture future) throws Exception {
Assert.assertTrue(fg.isSuccess());
Assert.assertEquals("test", fg.data().object());
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
}
});
while (!peer1Done.get() && !peer2Done.get())
Thread.sleep(100);
// both are started up
Assert.assertTrue(fb1.isSuccess());
Assert.assertTrue(fb2.isSuccess());
// get data again for both
fg1 = peer1.get(Number160.ONE).start().awaitUninterruptibly();
Assert.assertTrue(fg1.isSuccess());
Assert.assertEquals("test", fg1.data().object());
fg2 = peer2.get(Number160.ONE).start().awaitUninterruptibly();
Assert.assertTrue(fg2.isSuccess());
Assert.assertEquals("test", fg2.data().object());
peer1.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
peer2.shutdown().awaitUninterruptibly().awaitListenersUninterruptibly();
}
@Test
@Repeat(STRESS_TEST_COUNT)
public void testAddRemove() throws Exception {