Implement sending and receiving mailbox messages in network stress test

Not working thoug, lots of "Message not broadcasted because we have stopped the handler already.".
This commit is contained in:
Ivan Vilata-i-Balaguer 2016-05-17 14:06:07 +02:00
parent 14374fed6d
commit 976d9cb333

View File

@ -7,13 +7,13 @@ import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.KeyStorage;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.common.util.Tuple3;
import io.bitsquare.crypto.DecryptedMsgWithPubKey;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.messaging.DirectMessage;
import io.bitsquare.p2p.messaging.SendDirectMessageListener;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.seed.SeedNode;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import javafx.beans.property.BooleanProperty;
@ -374,10 +374,18 @@ public class NetworkStressTest {
// and the new first online node sends messages.
// This is repeated until all nodes have been online and offline.
// Put the second half of peers offline.
final int nPeers = peerNodes.size();
final CountDownLatch halfShutDown = new CountDownLatch(nPeers / 2);
// No sent latch here since the order of events is different
// depending on whether the message goes direct or via mailbox.
final CountDownLatch receivedMailboxLatch = new CountDownLatch(mailboxCount * nPeers);
// Configure the first half of peers to receive messages...
int firstPeerDown = (int)Math.ceil(nPeers / 2.0);
for (P2PService peer : peerNodes.subList(0, firstPeerDown)) {
addMailboxListeners(peer, receivedMailboxLatch);
}
// ...and put the second half offline.
final CountDownLatch halfShutDown = new CountDownLatch(nPeers / 2);
for (P2PService peer : peerNodes.subList(firstPeerDown, nPeers)) {
peer.shutDown(halfShutDown::countDown);
}
@ -387,26 +395,53 @@ public class NetworkStressTest {
// Cycle through peers sending to others, stopping the peer
// and starting one of the stopped peers.
//
// No sent latch here since the order of events is different
// depending on whether the message goes direct or via mailbox.
final CountDownLatch receivedMailboxLatch = new CountDownLatch(mailboxCount * nPeers);
BooleanProperty sentMailboxFailed = new SimpleBooleanProperty(false);
for (int firstOnline = 0, firstOffline = firstPeerDown;
firstOnline < nPeers;
firstOnline++, firstOffline = ++firstOffline % nPeers) {
// TODO: Make first online node send messages to other nodes.
// The first online peer sends messages to random other peers.
final P2PService onlinePeer = peerNodes.get(firstOnline);
final NodeAddress onlinePeerAddress = onlinePeer.getAddress();
for (int i = 0; i < mailboxCount; i++) {
// Select a random peer (different than source one)...
int peerIdx;
NodeAddress peerAddr;
do {
peerIdx = (int) (Math.random() * nPeers);
peerAddr = peerNodes.get(peerIdx).getAddress();
} while (onlinePeerAddress.equals(peerAddr));
final int dstPeerIdx = peerIdx;
final NodeAddress dstPeerAddress = peerAddr;
// ...and send a message to it.
onlinePeer.sendEncryptedMailboxMessage(dstPeerAddress, peerPKRings.get(dstPeerIdx),
new StressTestMailboxMessage(onlinePeerAddress, "test/" + dstPeerAddress),
new SendMailboxMessageListener() { // checked in receiver
@Override
public void onArrived() {
}
@Override
public void onStoredInMailbox() {
}
@Override
public void onFault(String errorMessage) {
sentMailboxFailed.set(true);
}
});
}
// When done, put first online peer offline.
final CountDownLatch stopLatch = new CountDownLatch(1);
peerNodes.get(firstOnline).shutDown(stopLatch::countDown);
onlinePeer.shutDown(stopLatch::countDown);
assertLatch("timed out while stopping peer " + firstOnline,
stopLatch, 10, TimeUnit.SECONDS);
print("put peer %d offline", firstOnline);
// When done, put first offline peer online.
// When done, put first offline peer online and setup message listeners.
final CountDownLatch startLatch = new CountDownLatch(1);
final P2PService startedPeer = createPeerNode(firstOffline, peerPorts.get(firstOffline));
// TODO: Setup message listeners.
addMailboxListeners(startedPeer, receivedMailboxLatch);
peerNodes.set(firstOffline, startedPeer);
startedPeer.start(new MailboxStartListener(startLatch));
assertLatch("timed out while starting peer " + firstOffline,
@ -418,6 +453,36 @@ public class NetworkStressTest {
receivedMailboxLatch, 120, TimeUnit.SECONDS);
}
/** Configure the peer to decrease the latch on receipt of mailbox message (direct or via mailbox). */
private void addMailboxListeners(P2PService peer, CountDownLatch receivedMailboxLatch) {
class MailboxMessageListener implements DecryptedDirectMessageListener, DecryptedMailboxListener {
private void handle(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
if (!(decryptedMsgWithPubKey.message instanceof StressTestMailboxMessage))
return;
StressTestMailboxMessage msg = (StressTestMailboxMessage) (decryptedMsgWithPubKey.message);
if ((msg.getData().equals("test/" + peer.getAddress())))
receivedMailboxLatch.countDown();
}
@Override
public void onDirectMessage(
DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) {
handle(decryptedMsgWithPubKey);
}
@Override
public void onMailboxMessageAdded(
DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress srcNodeAddress) {
handle(decryptedMsgWithPubKey);
}
}
final MailboxMessageListener listener = new MailboxMessageListener();
peer.addDecryptedDirectMessageListener(listener);
peer.addDecryptedMailboxListener(listener);
}
private void print(String message, Object... args) {
System.out.println(this.getClass().getSimpleName() + ": "
+ String.format(message, args));
@ -639,3 +704,36 @@ final class StressTestDirectMessage implements DirectMessage {
return data;
}
}
final class StressTestMailboxMessage implements MailboxMessage {
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final int messageVersion = Version.getP2PMessageVersion();
private final String uid = UUID.randomUUID().toString();
private NodeAddress senderNodeAddress;
private String data;
StressTestMailboxMessage(NodeAddress sender, String data) {
this.senderNodeAddress = sender;
this.data = data;
}
@Override
public int getMessageVersion() {
return messageVersion;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public String getUID() {
return uid;
}
String getData() {
return data;
}
}