diff --git a/network/src/test/java/io/bitsquare/p2p/network/NetworkStressTest.java b/network/src/test/java/io/bitsquare/p2p/network/NetworkStressTest.java index bd42ef3556..e73a9a7b37 100644 --- a/network/src/test/java/io/bitsquare/p2p/network/NetworkStressTest.java +++ b/network/src/test/java/io/bitsquare/p2p/network/NetworkStressTest.java @@ -7,13 +7,13 @@ import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.KeyStorage; import io.bitsquare.common.crypto.PubKeyRing; import io.bitsquare.common.util.Tuple3; +import io.bitsquare.crypto.DecryptedMsgWithPubKey; import io.bitsquare.crypto.EncryptionService; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.P2PServiceListener; import io.bitsquare.p2p.Utils; -import io.bitsquare.p2p.messaging.DirectMessage; -import io.bitsquare.p2p.messaging.SendDirectMessageListener; +import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.seed.SeedNode; import io.bitsquare.p2p.seed.SeedNodesRepository; import javafx.beans.property.BooleanProperty; @@ -374,10 +374,18 @@ public class NetworkStressTest { // and the new first online node sends messages. // This is repeated until all nodes have been online and offline. - // Put the second half of peers offline. final int nPeers = peerNodes.size(); - final CountDownLatch halfShutDown = new CountDownLatch(nPeers / 2); + // No sent latch here since the order of events is different + // depending on whether the message goes direct or via mailbox. + final CountDownLatch receivedMailboxLatch = new CountDownLatch(mailboxCount * nPeers); + + // Configure the first half of peers to receive messages... int firstPeerDown = (int)Math.ceil(nPeers / 2.0); + for (P2PService peer : peerNodes.subList(0, firstPeerDown)) { + addMailboxListeners(peer, receivedMailboxLatch); + } + // ...and put the second half offline. + final CountDownLatch halfShutDown = new CountDownLatch(nPeers / 2); for (P2PService peer : peerNodes.subList(firstPeerDown, nPeers)) { peer.shutDown(halfShutDown::countDown); } @@ -387,26 +395,53 @@ public class NetworkStressTest { // Cycle through peers sending to others, stopping the peer // and starting one of the stopped peers. - // - // No sent latch here since the order of events is different - // depending on whether the message goes direct or via mailbox. - final CountDownLatch receivedMailboxLatch = new CountDownLatch(mailboxCount * nPeers); + BooleanProperty sentMailboxFailed = new SimpleBooleanProperty(false); for (int firstOnline = 0, firstOffline = firstPeerDown; firstOnline < nPeers; firstOnline++, firstOffline = ++firstOffline % nPeers) { - // TODO: Make first online node send messages to other nodes. + // The first online peer sends messages to random other peers. + final P2PService onlinePeer = peerNodes.get(firstOnline); + final NodeAddress onlinePeerAddress = onlinePeer.getAddress(); + for (int i = 0; i < mailboxCount; i++) { + // Select a random peer (different than source one)... + int peerIdx; + NodeAddress peerAddr; + do { + peerIdx = (int) (Math.random() * nPeers); + peerAddr = peerNodes.get(peerIdx).getAddress(); + } while (onlinePeerAddress.equals(peerAddr)); + final int dstPeerIdx = peerIdx; + final NodeAddress dstPeerAddress = peerAddr; + // ...and send a message to it. + onlinePeer.sendEncryptedMailboxMessage(dstPeerAddress, peerPKRings.get(dstPeerIdx), + new StressTestMailboxMessage(onlinePeerAddress, "test/" + dstPeerAddress), + new SendMailboxMessageListener() { // checked in receiver + @Override + public void onArrived() { + } + + @Override + public void onStoredInMailbox() { + } + + @Override + public void onFault(String errorMessage) { + sentMailboxFailed.set(true); + } + }); + } // When done, put first online peer offline. final CountDownLatch stopLatch = new CountDownLatch(1); - peerNodes.get(firstOnline).shutDown(stopLatch::countDown); + onlinePeer.shutDown(stopLatch::countDown); assertLatch("timed out while stopping peer " + firstOnline, stopLatch, 10, TimeUnit.SECONDS); print("put peer %d offline", firstOnline); - // When done, put first offline peer online. + // When done, put first offline peer online and setup message listeners. final CountDownLatch startLatch = new CountDownLatch(1); final P2PService startedPeer = createPeerNode(firstOffline, peerPorts.get(firstOffline)); - // TODO: Setup message listeners. + addMailboxListeners(startedPeer, receivedMailboxLatch); peerNodes.set(firstOffline, startedPeer); startedPeer.start(new MailboxStartListener(startLatch)); assertLatch("timed out while starting peer " + firstOffline, @@ -418,6 +453,36 @@ public class NetworkStressTest { receivedMailboxLatch, 120, TimeUnit.SECONDS); } + /** Configure the peer to decrease the latch on receipt of mailbox message (direct or via mailbox). */ + private void addMailboxListeners(P2PService peer, CountDownLatch receivedMailboxLatch) { + class MailboxMessageListener implements DecryptedDirectMessageListener, DecryptedMailboxListener { + private void handle(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { + if (!(decryptedMsgWithPubKey.message instanceof StressTestMailboxMessage)) + return; + StressTestMailboxMessage msg = (StressTestMailboxMessage) (decryptedMsgWithPubKey.message); + if ((msg.getData().equals("test/" + peer.getAddress()))) + receivedMailboxLatch.countDown(); + } + + @Override + public void onDirectMessage( + DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) { + handle(decryptedMsgWithPubKey); + } + + @Override + public void onMailboxMessageAdded( + DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) { + handle(decryptedMsgWithPubKey); + } + } + + final MailboxMessageListener listener = new MailboxMessageListener(); + peer.addDecryptedDirectMessageListener(listener); + peer.addDecryptedMailboxListener(listener); + } + + private void print(String message, Object... args) { System.out.println(this.getClass().getSimpleName() + ": " + String.format(message, args)); @@ -639,3 +704,36 @@ final class StressTestDirectMessage implements DirectMessage { return data; } } + +final class StressTestMailboxMessage implements MailboxMessage { + private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; + private final int messageVersion = Version.getP2PMessageVersion(); + + private final String uid = UUID.randomUUID().toString(); + private NodeAddress senderNodeAddress; + private String data; + + StressTestMailboxMessage(NodeAddress sender, String data) { + this.senderNodeAddress = sender; + this.data = data; + } + + @Override + public int getMessageVersion() { + return messageVersion; + } + + @Override + public NodeAddress getSenderNodeAddress() { + return senderNodeAddress; + } + + @Override + public String getUID() { + return uid; + } + + String getData() { + return data; + } +}