From 79f3ac99cf03206cca1d005fe8e1c9771d9cd6b8 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 6 Nov 2015 22:34:32 +0100 Subject: [PATCH] Add peerrequest messages, clean up logging --- .../java/io/bitsquare/storage/Storage.java | 6 +- gui/src/main/resources/logback.xml | 10 +- .../java/io/bitsquare/p2p/P2PService.java | 28 +- .../io/bitsquare/p2p/network/Connection.java | 19 +- .../p2p/network/ConnectionListener.java | 1 - .../p2p/network/MessageListener.java | 1 - .../java/io/bitsquare/p2p/network/Server.java | 14 +- .../bitsquare/p2p/network/ServerListener.java | 5 - .../bitsquare/p2p/network/SetupListener.java | 2 - .../p2p/network/messages/SelfTestMessage.java | 15 - .../AuthenticationHandshake.java | 174 +++---- .../AuthenticationListener.java | 2 +- .../bitsquare/p2p/{peer => peers}/Peer.java | 2 +- .../p2p/{peer => peers}/PeerGroup.java | 491 +++++++++--------- .../p2p/{peer => peers}/PeerListener.java | 2 +- .../messages/auth}/AuthenticationMessage.java | 2 +- .../messages/auth/AuthenticationRequest.java} | 6 +- .../auth/AuthenticationResponse.java} | 6 +- .../messages/auth/GetPeersAuthRequest.java} | 6 +- .../messages/auth/GetPeersAuthResponse.java} | 6 +- .../messages/maintenance/GetPeersRequest.java | 27 + .../maintenance/GetPeersResponse.java | 27 + .../maintenance}/MaintenanceMessage.java | 2 +- .../messages/maintenance}/PingMessage.java | 2 +- .../messages/maintenance}/PongMessage.java | 2 +- .../java/io/bitsquare/p2p/seed/SeedNode.java | 6 +- .../ProtectedExpirableDataStorage.java | 15 +- ...llDataMessage.java => GetDataRequest.java} | 4 +- ...lDataMessage.java => GetDataResponse.java} | 8 +- network/src/main/resources/logback.xml | 3 + .../java/io/bitsquare/p2p/P2PServiceTest.java | 2 +- .../p2p/network/LocalhostNetworkNodeTest.java | 6 +- .../bitsquare/p2p/routing/PeerGroupTest.java | 8 +- .../p2p/storage/ProtectedDataStorageTest.java | 2 +- seednode/src/main/resources/logback.xml | 9 +- 35 files changed, 489 insertions(+), 432 deletions(-) delete mode 100644 network/src/main/java/io/bitsquare/p2p/network/ServerListener.java delete mode 100644 network/src/main/java/io/bitsquare/p2p/network/messages/SelfTestMessage.java rename network/src/main/java/io/bitsquare/p2p/{peer => peers}/AuthenticationHandshake.java (52%) rename network/src/main/java/io/bitsquare/p2p/{peer => peers}/AuthenticationListener.java (92%) rename network/src/main/java/io/bitsquare/p2p/{peer => peers}/Peer.java (97%) rename network/src/main/java/io/bitsquare/p2p/{peer => peers}/PeerGroup.java (70%) rename network/src/main/java/io/bitsquare/p2p/{peer => peers}/PeerListener.java (90%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages => peers/messages/auth}/AuthenticationMessage.java (67%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages/RequestAuthenticationMessage.java => peers/messages/auth/AuthenticationRequest.java} (74%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages/ChallengeMessage.java => peers/messages/auth/AuthenticationResponse.java} (77%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages/GetPeersMessage.java => peers/messages/auth/GetPeersAuthRequest.java} (77%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages/PeersMessage.java => peers/messages/auth/GetPeersAuthResponse.java} (73%) create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersRequest.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersResponse.java rename network/src/main/java/io/bitsquare/p2p/{peer/messages => peers/messages/maintenance}/MaintenanceMessage.java (63%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages => peers/messages/maintenance}/PingMessage.java (90%) rename network/src/main/java/io/bitsquare/p2p/{peer/messages => peers/messages/maintenance}/PongMessage.java (90%) rename network/src/main/java/io/bitsquare/p2p/storage/messages/{GetAllDataMessage.java => GetDataRequest.java} (76%) rename network/src/main/java/io/bitsquare/p2p/storage/messages/{AllDataMessage.java => GetDataResponse.java} (79%) diff --git a/common/src/main/java/io/bitsquare/storage/Storage.java b/common/src/main/java/io/bitsquare/storage/Storage.java index 11a7a52c2f..044149f0a5 100644 --- a/common/src/main/java/io/bitsquare/storage/Storage.java +++ b/common/src/main/java/io/bitsquare/storage/Storage.java @@ -95,7 +95,7 @@ public class Storage { // Save delayed and on a background thread public void queueUpForSave() { - log.debug("save " + fileName); + log.trace("save " + fileName); checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write."); fileManager.saveLater(serializable); @@ -118,12 +118,12 @@ public class Storage { long now = System.currentTimeMillis(); try { T persistedObject = fileManager.read(storageFile); - log.info("Read {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now); + log.trace("Read {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now); // If we did not get any exception we can be sure the data are consistent so we make a backup now = System.currentTimeMillis(); fileManager.backupFile(fileName); - log.info("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now); + log.trace("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now); return persistedObject; } catch (ClassCastException | IOException e) { diff --git a/gui/src/main/resources/logback.xml b/gui/src/main/resources/logback.xml index f7253aa592..f876642588 100644 --- a/gui/src/main/resources/logback.xml +++ b/gui/src/main/resources/logback.xml @@ -6,14 +6,18 @@ - + - + - diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index f22e6a43b0..c6ce8483ad 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -15,9 +15,9 @@ import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.peer.Peer; -import io.bitsquare.p2p.peer.PeerGroup; -import io.bitsquare.p2p.peer.PeerListener; +import io.bitsquare.p2p.peers.Peer; +import io.bitsquare.p2p.peers.PeerGroup; +import io.bitsquare.p2p.peers.PeerListener; import io.bitsquare.p2p.seed.SeedNodesRepository; import io.bitsquare.p2p.storage.HashMapChangedListener; import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage; @@ -25,8 +25,8 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload; import io.bitsquare.p2p.storage.data.ExpirablePayload; import io.bitsquare.p2p.storage.data.ProtectedData; import io.bitsquare.p2p.storage.data.ProtectedMailboxData; -import io.bitsquare.p2p.storage.messages.AllDataMessage; -import io.bitsquare.p2p.storage.messages.GetAllDataMessage; +import io.bitsquare.p2p.storage.messages.GetDataRequest; +import io.bitsquare.p2p.storage.messages.GetDataResponse; import javafx.beans.property.BooleanProperty; import javafx.beans.property.SimpleBooleanProperty; import org.fxmisc.easybind.EasyBind; @@ -153,17 +153,13 @@ public class P2PService implements SetupListener { }); networkNode.addMessageListener((message, connection) -> { - if (message instanceof GetAllDataMessage) { + if (message instanceof GetDataRequest) { log.trace("Received GetDataSetMessage: " + message); - networkNode.sendMessage(connection, new AllDataMessage(getDataSet())); - } else if (message instanceof AllDataMessage) { - AllDataMessage allDataMessage = (AllDataMessage) message; - HashSet set = allDataMessage.set; + networkNode.sendMessage(connection, new GetDataResponse(getDataSet())); + } else if (message instanceof GetDataResponse) { + GetDataResponse getDataResponse = (GetDataResponse) message; + HashSet set = getDataResponse.set; if (!set.isEmpty()) { - StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n"); - set.stream().forEach(e -> sb.append(e.toString() + "\n")); - sb.append("\n"); - log.trace(sb.toString()); // we keep that connection open as the bootstrapping peer will use that for the authentication // as we are not authenticated yet the data adding will not be broadcasted set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); @@ -272,7 +268,7 @@ public class P2PService implements SetupListener { Address candidate = remainingSeedNodeAddresses.remove(0); log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); - SettableFuture future = networkNode.sendMessage(candidate, new GetAllDataMessage()); + SettableFuture future = networkNode.sendMessage(candidate, new GetDataRequest()); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { @@ -315,7 +311,7 @@ public class P2PService implements SetupListener { private void sendGetAllDataMessageAfterAuthentication(final Peer peer) { log.trace("sendGetDataSetMessageAfterAuthentication"); // After authentication we request again data as we might have missed pushed data in the meantime - SettableFuture future = networkNode.sendMessage(peer.connection, new GetAllDataMessage()); + SettableFuture future = networkNode.sendMessage(peer.connection, new GetDataRequest()); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 3837178c49..ec199f5be8 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -188,15 +188,18 @@ public class Connection { private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { if (!stopped) { - log.info("\n\nShutDown connection:" + StringBuilder result = new StringBuilder("\n\n############################################################\n" + + "ShutDown connection:" + "\npeerAddress=" + peerAddress - + "\nobjectId=" + getObjectId() - + "\nuid=" + getUid() - + "\nisAuthenticated=" + isAuthenticated() - + "\nsocket.getPort()=" + sharedSpace.getSocket().getPort() - + "\n\n"); - log.debug("ShutDown " + this.getObjectId()); - log.debug("ShutDown connection requested. Connection=" + this.toString()); + + "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort() + + "/" + sharedSpace.getSocket().getPort() + + "\nobjectId=" + getObjectId() + " / uid=" + getUid() + + "\nisAuthenticated=" + isAuthenticated()); + result.append("\n############################################################\n"); + log.info(result.toString()); + + log.trace("ShutDown " + this.getObjectId()); + log.trace("ShutDown connection requested. Connection=" + this.toString()); stopped = true; sharedSpace.stop(); diff --git a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java index 9a89e6bca3..aed7d2fd9f 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java @@ -4,7 +4,6 @@ package io.bitsquare.p2p.network; import io.bitsquare.p2p.Address; public interface ConnectionListener { - enum Reason { SOCKET_CLOSED, RESET, diff --git a/network/src/main/java/io/bitsquare/p2p/network/MessageListener.java b/network/src/main/java/io/bitsquare/p2p/network/MessageListener.java index cb06759d49..29eb63eb32 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/MessageListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/MessageListener.java @@ -3,6 +3,5 @@ package io.bitsquare.p2p.network; import io.bitsquare.p2p.Message; public interface MessageListener { - void onMessage(Message message, Connection connection); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 9f1e76eb5a..2db6dbce98 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -38,13 +38,15 @@ public class Server implements Runnable { if (!stopped) { log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort()); Connection connection = new Connection(socket, messageListener, connectionListener); - log.info("\n\nServer created new inbound connection:" - + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() - + "\nsocket.getPort()=" + socket.getPort() - + "\nconnection.uid=" + connection.getUid() - + "\n\n"); - log.info("Server created new socket with port " + socket.getPort()); + StringBuilder result = new StringBuilder("\n\n############################################################\n" + + "Server created new inbound connection:" + + "\nlocalPort/port=" + serverSocket.getLocalPort() + + "/" + socket.getPort() + + "\nconnection.uid=" + connection.getUid()); + result.append("\n############################################################\n"); + log.info(result.toString()); + if (!stopped) connections.add(connection); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/ServerListener.java b/network/src/main/java/io/bitsquare/p2p/network/ServerListener.java deleted file mode 100644 index 92e35c701b..0000000000 --- a/network/src/main/java/io/bitsquare/p2p/network/ServerListener.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.bitsquare.p2p.network; - -public interface ServerListener { - void onSocketHandler(Connection connection); -} diff --git a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java index 371b91bc41..d46b4a6c6b 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java @@ -2,11 +2,9 @@ package io.bitsquare.p2p.network; public interface SetupListener { - void onTorNodeReady(); void onHiddenServicePublished(); void onSetupFailed(Throwable throwable); - } diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/SelfTestMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/SelfTestMessage.java deleted file mode 100644 index 792af1f3c6..0000000000 --- a/network/src/main/java/io/bitsquare/p2p/network/messages/SelfTestMessage.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.bitsquare.p2p.network.messages; - -import io.bitsquare.app.Version; -import io.bitsquare.p2p.Message; - -public final class SelfTestMessage implements Message { - // That object is sent over the wire, so we need to take care of version compatibility. - private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - - public final long nonce; - - public SelfTestMessage(long nonce) { - this.nonce = nonce; - } -} diff --git a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java similarity index 52% rename from network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java rename to network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index af12b24614..92fc8b89ca 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer; +package io.bitsquare.p2p.peers; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; @@ -9,10 +9,7 @@ import io.bitsquare.common.util.Tuple2; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peer.messages.ChallengeMessage; -import io.bitsquare.p2p.peer.messages.GetPeersMessage; -import io.bitsquare.p2p.peer.messages.PeersMessage; -import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; +import io.bitsquare.p2p.peers.messages.auth.*; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -54,11 +51,11 @@ public class AuthenticationHandshake { // Requesting peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); - SettableFuture future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce())); + SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); + log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); } @Override @@ -73,18 +70,15 @@ public class AuthenticationHandshake { } public SettableFuture requestAuthentication(Set
remainingAddresses, Address peerAddress) { - log.info("requestAuthentication " + this); - log.info("remainingAddresses " + remainingAddresses); - log.info("peerAddress " + peerAddress); // Requesting peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); remainingAddresses.remove(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce())); + SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); + log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); } @Override @@ -101,23 +95,25 @@ public class AuthenticationHandshake { } - public SettableFuture processAuthenticationRequest(RequestAuthenticationMessage requestAuthenticationMessage, Connection connection) { + public SettableFuture processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) { // Responding peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); - Address peerAddress = requestAuthenticationMessage.address; + Address peerAddress = authenticationRequest.address; log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); + log.info("We shut down inbound connection from peer {} to establish a new " + + "connection with his reported address.", peerAddress); connection.shutDown(() -> UserThread.runAfter(() -> { // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(myAddress, requestAuthenticationMessage.nonce, getAndSetNonce())); + SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.debug("onSuccess sending ChallengeMessage"); + log.trace("onSuccess sending ChallengeMessage"); } @Override @@ -135,88 +131,90 @@ public class AuthenticationHandshake { private void setupMessageListener() { networkNode.addMessageListener((message, connection) -> { - if (message instanceof ChallengeMessage) { - // Requesting peer - ChallengeMessage challengeMessage = (ChallengeMessage) message; - Address peerAddress = challengeMessage.address; - log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); - log.trace("challengeMessage" + challengeMessage); - // HashMap tempNonceMap = new HashMap<>(nonceMap); - boolean verified = nonce != 0 && nonce == challengeMessage.requesterNonce; - if (verified) { - connection.setPeerAddress(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, - new GetPeersMessage(myAddress, challengeMessage.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress); - } + if (message instanceof AuthenticationMessage) { + if (message instanceof AuthenticationResponse) { + // Requesting peer + AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; + Address peerAddress = authenticationResponse.address; + log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); + log.trace("challengeMessage" + authenticationResponse); + // HashMap tempNonceMap = new HashMap<>(nonceMap); + boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; + if (verified) { + connection.setPeerAddress(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersMessage sending failed " + throwable.getMessage()); - UserThread.execute(() -> resultFuture.setException(throwable)); - } - }); - } else { - log.warn("verify nonce failed. challengeMessage=" + challengeMessage + " / nonce=" + nonce); - UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + challengeMessage + " / nonceMap=" + nonce))); - } - } else if (message instanceof GetPeersMessage) { - // Responding peer - GetPeersMessage getPeersMessage = (GetPeersMessage) message; - Address peerAddress = getPeersMessage.address; - log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); - boolean verified = nonce != 0 && nonce == getPeersMessage.challengerNonce; - if (verified) { - // we add the reported peers to our own set - HashSet
peerAddresses = ((GetPeersMessage) message).peerAddresses; + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("GetPeersMessage sending failed " + throwable.getMessage()); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + } else { + log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce); + UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce))); + } + } else if (message instanceof GetPeersAuthRequest) { + // Responding peer + GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; + Address peerAddress = getPeersAuthRequest.address; + log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); + boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce; + if (verified) { + // we add the reported peers to our own set + HashSet
peerAddresses = getPeersAuthRequest.peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); + log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress + + " with allPeers=" + peerGroup.getAllPeerAddresses()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PeersMessage sending failed " + throwable.getMessage()); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + + log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress + + " authenticated (" + connection.getObjectId() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + UserThread.execute(() -> resultFuture.set(connection)); + } else { + log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce); + UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonceMap=" + nonce))); + } + } else if (message instanceof GetPeersAuthResponse) { + // Requesting peer + GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message; + Address peerAddress = getPeersAuthResponse.address; + log.trace("PeersMessage from " + peerAddress + " at " + myAddress); + HashSet
peerAddresses = getPeersAuthResponse.peerAddresses; log.trace("Received peers: " + peerAddresses); peerGroup.addToReportedPeers(peerAddresses, connection); - SettableFuture future = networkNode.sendMessage(peerAddress, - new PeersMessage(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); - log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress - + " with allPeers=" + peerGroup.getAllPeerAddresses()); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("PeersMessage sending failed " + throwable.getMessage()); - UserThread.execute(() -> resultFuture.setException(throwable)); - } - }); - + // we wait until the handshake is completed before setting the authenticate flag + // authentication at both sides of the connection log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + " authenticated (" + connection.getObjectId() + "). Took " + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); UserThread.execute(() -> resultFuture.set(connection)); - } else { - log.warn("verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce); - UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce))); } - } else if (message instanceof PeersMessage) { - // Requesting peer - PeersMessage peersMessage = (PeersMessage) message; - Address peerAddress = peersMessage.address; - log.trace("PeersMessage from " + peerAddress + " at " + myAddress); - HashSet
peerAddresses = peersMessage.peerAddresses; - log.trace("Received peers: " + peerAddresses); - peerGroup.addToReportedPeers(peerAddresses, connection); - - // we wait until the handshake is completed before setting the authenticate flag - // authentication at both sides of the connection - log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress - + " authenticated (" + connection.getObjectId() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); - - UserThread.execute(() -> resultFuture.set(connection)); } }); } diff --git a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java similarity index 92% rename from network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java rename to network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java index 18b68f98cb..6894208476 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer; +package io.bitsquare.p2p.peers; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/Peer.java b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java similarity index 97% rename from network/src/main/java/io/bitsquare/p2p/peer/Peer.java rename to network/src/main/java/io/bitsquare/p2p/peers/Peer.java index 8e63bda9b1..e2655ecf64 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/Peer.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer; +package io.bitsquare.p2p.peers; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java similarity index 70% rename from network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java rename to network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index cd53a29b79..972f9498a3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -1,18 +1,17 @@ -package io.bitsquare.p2p.peer; +package io.bitsquare.p2p.peers; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Tuple2; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peer.messages.MaintenanceMessage; -import io.bitsquare.p2p.peer.messages.PingMessage; -import io.bitsquare.p2p.peer.messages.PongMessage; -import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest; +import io.bitsquare.p2p.peers.messages.maintenance.*; import io.bitsquare.p2p.storage.messages.BroadcastMessage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -39,6 +38,7 @@ public class PeerGroup { private static int MAX_CONNECTIONS = 8; private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. + private static int GET_PEERS_INTERVAL = 30000;//new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; public static void setMaxConnections(int maxConnections) { @@ -46,16 +46,16 @@ public class PeerGroup { } private final NetworkNode networkNode; - - private final Set
seedNodeAddresses; + private final CopyOnWriteArraySet peerListeners = new CopyOnWriteArraySet<>(); private final ConcurrentHashMap authenticatedPeers = new ConcurrentHashMap<>(); private final CopyOnWriteArraySet
reportedPeerAddresses = new CopyOnWriteArraySet<>(); - ; - private final Timer maintenanceTimer = new Timer(); + private final Timer getPeersTimer = new Timer(); + private volatile boolean shutDownInProgress; + private boolean firstPeerAdded = false; /////////////////////////////////////////////////////////////////////////////////////////// @@ -73,31 +73,11 @@ public class PeerGroup { networkNode.addMessageListener((message, connection) -> { if (message instanceof MaintenanceMessage) processMaintenanceMessage((MaintenanceMessage) message, connection); - else if (message instanceof RequestAuthenticationMessage) { - AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress()); - SettableFuture future = authenticationHandshake.processAuthenticationRequest((RequestAuthenticationMessage) message, connection); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - if (connection != null) { - UserThread.execute(() -> { - setAuthenticated(connection, connection.getPeerAddress()); - purgeReportedPeers(); - }); - } - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - throwable.printStackTrace(); - log.error("AuthenticationHandshake failed. " + throwable.getMessage()); - UserThread.execute(() -> removePeer(connection.getPeerAddress())); - } - }); + else if (message instanceof AuthenticationRequest) { + processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection); } }); - networkNode.addConnectionListener(new ConnectionListener() { @Override public void onConnection(Connection connection) { @@ -121,56 +101,7 @@ public class PeerGroup { } }); - maintenanceTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000)); - try { - UserThread.execute(() -> { - disconnectOldConnections(); - pingPeers(); - }); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - } - }, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL); - } - - private void disconnectOldConnections() { - List authenticatedConnections = networkNode.getAllConnections().stream() - .filter(e -> e.isAuthenticated()) - .collect(Collectors.toList()); - if (authenticatedConnections.size() > MAX_CONNECTIONS) { - authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate())); - log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); - Connection connection = authenticatedConnections.remove(0); - log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS)); - } - } - - private void pingPeers() { - log.trace("pingPeers"); - Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); - connectedPeersList.stream() - .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) - .forEach(e -> UserThread.runAfterRandomDelay(() -> { - SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("PingMessage sent successfully"); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("PingMessage sending failed " + throwable.getMessage()); - removePeer(e.address); - } - }); - }, 5, 10)); + setupMaintenanceTimer(); } @@ -193,7 +124,7 @@ public class PeerGroup { public void broadcast(BroadcastMessage message, @Nullable Address sender) { log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers."); log.trace("message = " + message); - printConnectedPeersMap(); + printAuthenticatedPeers(); // TODO add randomized timing? authenticatedPeers.values().stream() @@ -221,10 +152,36 @@ public class PeerGroup { // Authentication to seed node /////////////////////////////////////////////////////////////////////////////////////////// + private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) { + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress()); + SettableFuture future = authenticationHandshake.processAuthenticationRequest(message, connection); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + if (connection != null) { + UserThread.execute(() -> { + setAuthenticated(connection, connection.getPeerAddress()); + purgeReportedPeers(); + }); + } + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + UserThread.execute(() -> removePeer(connection.getPeerAddress())); + } + }); + } + public void authenticateSeedNode(Address peerAddress) { authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true); } + // First we try to connect to 1 seed node. If we fail we try to connect to any reported peer. + // After connection is authenticated, we try to connect to any reported peer as long we have not + // reached our max connection size. public void authenticateToSeedNode(Set
remainingAddresses, Address peerAddress, boolean continueOnSuccess) { checkArgument(!authenticatedPeers.containsKey(peerAddress), "We have that peer already authenticated. That must never happen."); @@ -239,14 +196,14 @@ public class PeerGroup { if (continueOnSuccess) { if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { log.info("We still don't have enough connections. Lets try the reported peers."); - authenticateToAnyReportedPeer(); + authenticateToRemainingReportedPeers(); } else { log.info("We have already enough connections."); } } else { log.info("We have already tried all reported peers and seed nodes. " + "We stop bootstrapping now, but will repeat after an while."); - UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60); + UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60); } } } @@ -259,36 +216,33 @@ public class PeerGroup { // If we fail we try again with the remaining set remainingAddresses.remove(peerAddress); - List
list = new ArrayList<>(remainingAddresses); - removeAuthenticatedPeersFromList(list); - if (!list.isEmpty()) { - Address item = getAndRemoveRandomItem(list); - log.info("We try to build an authenticated connection to a seed node. " + item); - authenticateToSeedNode(remainingAddresses, item, true); + + Optional>> tupleOptional = getRandomItemAndRemainingSet(remainingAddresses); + if (tupleOptional.isPresent()) { + log.info("We try to authenticate to a seed node. " + tupleOptional.get().first); + authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true); } else { log.info("We don't have any more seed nodes for connecting. Lets try the reported peers."); - authenticateToAnyReportedPeer(); + authenticateToRemainingReportedPeers(); } } }); } - - private void authenticateToAnyReportedPeer() { - // after we have at least one seed node we try to get reported peers connected - List
list = new ArrayList<>(reportedPeerAddresses); - removeAuthenticatedPeersFromList(list); - if (!list.isEmpty()) { - Address item = getAndRemoveRandomItem(list); - log.info("We try to build an authenticated connection to a random peer. " + item + " / list=" + list); - authenticateToReportedPeer(new HashSet<>(list), item); + private void authenticateToRemainingReportedPeers() { + Optional>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses); + if (tupleOptional.isPresent()) { + log.info("We try to authenticate to a random peer. " + tupleOptional.get().first); + authenticateToReportedPeer(tupleOptional.get().second, tupleOptional.get().first); } else { log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes."); authenticateToRemainingSeedNodes(); } } - public void authenticateToReportedPeer(Set
remainingAddresses, Address peerAddress) { + // We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed. + // If we succeed we repeat until we are ut of addresses. + private void authenticateToReportedPeer(Set
remainingAddresses, Address peerAddress) { checkArgument(!authenticatedPeers.containsKey(peerAddress), "We have that peer already authenticated. That must never happen."); @@ -300,8 +254,14 @@ public class PeerGroup { if (connection != null) { setAuthenticated(connection, peerAddress); if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { - log.info("We still don't have enough connections. Lets try the remaining seed nodes."); - authenticateToRemainingSeedNodes(); + if (reportedPeerAddresses.size() > 0) { + log.info("We still don't have enough connections. " + + "Lets try the remaining reported peer addresses."); + authenticateToRemainingReportedPeers(); + } else { + log.info("We still don't have enough connections. Lets try the remaining seed nodes."); + authenticateToRemainingSeedNodes(); + } } else { log.info("We have already enough connections."); } @@ -313,101 +273,22 @@ public class PeerGroup { throwable.printStackTrace(); log.error("AuthenticationHandshake failed. " + throwable.getMessage()); removePeer(peerAddress); + + log.info("Authentication failed. Lets try again with the remaining reported peer addresses."); + authenticateToRemainingReportedPeers(); } }); } private void authenticateToRemainingSeedNodes() { - // after we have at least one seed node we try to get reported peers connected - List
list = new ArrayList<>(seedNodeAddresses); - removeAuthenticatedPeersFromList(list); - if (!list.isEmpty()) { - Address item = getAndRemoveRandomItem(list); - log.info("We try to build an authenticated connection to a random seed node. " + item + " / list=" + list); - authenticateToSeedNode(new HashSet<>(list), item, false); + Optional>> tupleOptional = getRandomItemAndRemainingSet(seedNodeAddresses); + if (tupleOptional.isPresent()) { + log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first); + authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, false); } else { log.info("We don't have any more seed nodes for connecting. " + "We stop bootstrapping now, but will repeat after an while."); - UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60); - } - } - - - /*private void authenticateToAnyNode1(Set
addresses, Address peerAddress, boolean prioritizeSeedNodes) { - checkArgument(!authenticatedPeers.containsKey(peerAddress), - "We have that peer already authenticated. That must never happen."); - - AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); - SettableFuture future = authenticationHandshake.requestAuthentication(addresses, peerAddress); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - setAuthenticated(connection, peerAddress); - authenticateToNextRandomPeer(); - } - - @Override - public void onFailure(Throwable throwable) { - throwable.printStackTrace(); - log.error("AuthenticationHandshake failed. " + throwable.getMessage()); - removePeer(peerAddress); - authenticateToNextRandomPeer(); - } - }); - } - - private void authenticateToNextRandomPeer() { - UserThread.runAfterRandomDelay(() -> { - log.info("authenticateToNextRandomPeer"); - if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { - Optional
candidate = getRandomReportedPeerAddress(); - if (candidate.isPresent()) { - log.info("We try to build an authenticated connection to a random peer. " + candidate.get()); - authenticateToReportedPeer(candidate.get(), ); - } else { - log.info("No more reportedPeerAddresses available for connecting. We try the remaining seed nodes"); - candidate = getRandomSeedNodeAddress(); - if (candidate.isPresent()) { - log.info("We try to build an authenticated connection to a random seed node. " + candidate.get()); - authenticateToReportedPeer(candidate.get(), get); - } else { - log.info("No more seed nodes available for connecting."); - } - } - } else { - log.info("We have already enough connections."); - } - }, 200, 400, TimeUnit.MILLISECONDS); - }*/ - - private Optional
getRandomSeedNodeAddress() { - List
list = new ArrayList<>(seedNodeAddresses); - log.debug("### getRandomSeedNodeAddress list " + list); - removeAuthenticatedPeersFromList(list); - log.debug("### list post removeAuthenticatedPeersFromList " + list); - return getRandomEntry(list); - } - - private Optional
getRandomReportedPeerAddress() { - List
list = new ArrayList<>(reportedPeerAddresses); - log.debug("### list reportedPeerAddresses " + reportedPeerAddresses); - log.debug("### list authenticatedPeers " + authenticatedPeers); - log.debug("### list pre " + list); - removeAuthenticatedPeersFromList(list); - log.debug("### list post " + list); - return getRandomEntry(list); - } - - private void removeAuthenticatedPeersFromList(List
list) { - authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); - } - - private Optional
getRandomEntry(List
list) { - if (list.size() > 0) { - Collections.shuffle(list); - return Optional.of(list.get(0)); - } else { - return Optional.empty(); + UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), 60); } } @@ -453,16 +334,15 @@ public class PeerGroup { connection.setAuthenticated(peerAddress, connection); - Peer peer = new Peer(connection); - addAuthenticatedPeer(peerAddress, peer); + addAuthenticatedPeer(new Peer(connection)); peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); } - private void addAuthenticatedPeer(Address address, Peer peer) { - boolean firstPeerAdded; - authenticatedPeers.put(address, peer); - firstPeerAdded = authenticatedPeers.size() == 1; + private void addAuthenticatedPeer(Peer peer) { + authenticatedPeers.put(peer.address, peer); + reportedPeerAddresses.remove(peer.address); + firstPeerAdded = !firstPeerAdded && authenticatedPeers.size() == 1; UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer))); @@ -472,7 +352,154 @@ public class PeerGroup { if (authenticatedPeers.size() > MAX_CONNECTIONS) disconnectOldConnections(); - printConnectedPeersMap(); + printAuthenticatedPeers(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Maintenance + /////////////////////////////////////////////////////////////////////////////////////////// + + private void setupMaintenanceTimer() { + maintenanceTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000)); + try { + UserThread.execute(() -> { + disconnectOldConnections(); + pingPeers(); + }); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL); + + getPeersTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("GetPeersTimer-" + new Random().nextInt(1000)); + try { + UserThread.execute(() -> sendAnnounceAndGetPeersMessage()); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }, GET_PEERS_INTERVAL, GET_PEERS_INTERVAL); + } + + + private void disconnectOldConnections() { + List authenticatedConnections = networkNode.getAllConnections().stream() + .filter(e -> e.isAuthenticated()) + .collect(Collectors.toList()); + if (authenticatedConnections.size() > MAX_CONNECTIONS) { + authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate())); + log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); + Connection connection = authenticatedConnections.remove(0); + log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); + connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS)); + } + } + + private void pingPeers() { + log.trace("pingPeers"); + Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); + connectedPeersList.stream() + .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) + .forEach(e -> UserThread.runAfterRandomDelay(() -> { + SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PingMessage sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PingMessage sending failed " + throwable.getMessage()); + removePeer(e.address); + } + }); + }, 5, 10)); + } + + private void sendAnnounceAndGetPeersMessage() { + log.trace("sendAnnounceAndGetPeersMessage"); + Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); + connectedPeersList.stream() + .forEach(e -> UserThread.runAfterRandomDelay(() -> { + SettableFuture future = networkNode.sendMessage(e.connection, + new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("AnnounceAndGetPeersMessage sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("AnnounceAndGetPeersMessage sending failed " + throwable.getMessage()); + removePeer(e.address); + } + }); + }, 5, 10)); + } + + private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) { + log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress()); + if (message instanceof PingMessage) { + SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PongMessage sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PongMessage sending failed " + throwable.getMessage()); + removePeer(connection.getPeerAddress()); + } + }); + } else if (message instanceof PongMessage) { + if (connection.getPeerAddress() != null) { + Peer peer = authenticatedPeers.get(connection.getPeerAddress()); + if (peer != null) { + if (((PongMessage) message).nonce != peer.getPingNonce()) { + removePeer(peer.address); + log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress()); + } + } + } + } else if (message instanceof GetPeersRequest) { + GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message; + HashSet
peerAddresses = getPeersRequestMessage.peerAddresses; + log.trace("Received peers: " + peerAddresses); + addToReportedPeers(peerAddresses, connection); + + SettableFuture future = networkNode.sendMessage(connection, + new GetPeersResponse(getMyAddress(), new HashSet<>(getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersResponse sent successfully"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("GetPeersResponse sending failed " + throwable.getMessage()); + removePeer(getPeersRequestMessage.address); + } + }); + } else if (message instanceof GetPeersResponse) { + GetPeersResponse getPeersResponse = (GetPeersResponse) message; + HashSet
peerAddresses = getPeersResponse.peerAddresses; + log.trace("Received peers: " + peerAddresses); + addToReportedPeers(peerAddresses, connection); + } } @@ -555,38 +582,6 @@ public class PeerGroup { } - /////////////////////////////////////////////////////////////////////////////////////////// - // Maintenance - /////////////////////////////////////////////////////////////////////////////////////////// - - private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) { - log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress()); - if (message instanceof PingMessage) { - SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("PongMessage sent successfully"); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("PongMessage sending failed " + throwable.getMessage()); - removePeer(connection.getPeerAddress()); - } - }); - } else if (message instanceof PongMessage) { - Peer peer = authenticatedPeers.get(connection.getPeerAddress()); - if (peer != null) { - if (((PongMessage) message).nonce != peer.getPingNonce()) { - removePeer(peer.address); - log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress()); - } - } - } - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Peers /////////////////////////////////////////////////////////////////////////////////////////// @@ -599,8 +594,8 @@ public class PeerGroup { if (disconnectedPeer != null) UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress))); - printConnectedPeersMap(); - printReportedPeersMap(); + printAuthenticatedPeers(); + printReportedPeers(); } private Address getMyAddress() { @@ -616,21 +611,39 @@ public class PeerGroup { return list.remove(new Random().nextInt(list.size())); } - public void printConnectedPeersMap() { - StringBuilder result = new StringBuilder("\nConnected peers for node " + getMyAddress() + ":"); + private Optional>> getRandomItemAndRemainingSet(Set
remainingAddresses) { + List
list = new ArrayList<>(remainingAddresses); + authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); + if (!list.isEmpty()) { + Address item = getAndRemoveRandomItem(list); + return Optional.of(new Tuple2<>(item, new HashSet<>(list))); + } else { + return Optional.empty(); + } + } + + public void printAllPeers() { + printAuthenticatedPeers(); + printReportedPeers(); + } + + public void printAuthenticatedPeers() { + StringBuilder result = new StringBuilder("\n\n############################################################\n" + + "Authenticated peers for node " + getMyAddress() + ":"); authenticatedPeers.values().stream().forEach(e -> { - result.append("\n\t" + e.address); + result.append("\n" + e.address); }); - result.append("\n"); + result.append("\n############################################################\n"); log.info(result.toString()); } - public void printReportedPeersMap() { - StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getMyAddress() + ":"); + public void printReportedPeers() { + StringBuilder result = new StringBuilder("\n\n############################################################\n" + + "Reported peers for node " + getMyAddress() + ":"); reportedPeerAddresses.stream().forEach(e -> { - result.append("\n\t" + e); + result.append("\n" + e); }); - result.append("\n"); + result.append("\n############################################################\n"); log.info(result.toString()); } } diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerListener.java similarity index 90% rename from network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java rename to network/src/main/java/io/bitsquare/p2p/peers/PeerListener.java index b09280412e..31366712f8 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerListener.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer; +package io.bitsquare.p2p.peers; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java similarity index 67% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java index 141a3ff722..8d230a4d2b 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/AuthenticationMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.p2p.Message; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java similarity index 74% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java index ca16aff52a..26e8e419c3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/RequestAuthenticationMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java @@ -1,16 +1,16 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; -public final class RequestAuthenticationMessage implements AuthenticationMessage { +public final class AuthenticationRequest implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; public final long nonce; - public RequestAuthenticationMessage(Address address, long nonce) { + public AuthenticationRequest(Address address, long nonce) { this.address = address; this.nonce = nonce; } diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java similarity index 77% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java index 1cfacfe15a..5e22c5bcf1 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/ChallengeMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java @@ -1,9 +1,9 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; -public final class ChallengeMessage implements AuthenticationMessage { +public final class AuthenticationResponse implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; @@ -11,7 +11,7 @@ public final class ChallengeMessage implements AuthenticationMessage { public final long requesterNonce; public final long challengerNonce; - public ChallengeMessage(Address address, long requesterNonce, long challengerNonce) { + public AuthenticationResponse(Address address, long requesterNonce, long challengerNonce) { this.address = address; this.requesterNonce = requesterNonce; this.challengerNonce = challengerNonce; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java similarity index 77% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java index 12ec467b81..cfbeb6dc67 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java @@ -1,11 +1,11 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; import java.util.HashSet; -public final class GetPeersMessage implements AuthenticationMessage { +public final class GetPeersAuthRequest implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; @@ -13,7 +13,7 @@ public final class GetPeersMessage implements AuthenticationMessage { public final long challengerNonce; public final HashSet
peerAddresses; - public GetPeersMessage(Address address, long challengerNonce, HashSet
peerAddresses) { + public GetPeersAuthRequest(Address address, long challengerNonce, HashSet
peerAddresses) { this.address = address; this.challengerNonce = challengerNonce; this.peerAddresses = peerAddresses; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java similarity index 73% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java index 00fdfb7344..6334e054a3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java @@ -1,18 +1,18 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; import java.util.HashSet; -public final class PeersMessage implements AuthenticationMessage { +public final class GetPeersAuthResponse implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; public final HashSet
peerAddresses; - public PeersMessage(Address address, HashSet
peerAddresses) { + public GetPeersAuthResponse(Address address, HashSet
peerAddresses) { this.address = address; this.peerAddresses = peerAddresses; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersRequest.java new file mode 100644 index 0000000000..87d115af1f --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersRequest.java @@ -0,0 +1,27 @@ +package io.bitsquare.p2p.peers.messages.maintenance; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.Address; + +import java.util.HashSet; + +public final class GetPeersRequest implements MaintenanceMessage { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + + public final Address address; + public final HashSet
peerAddresses; + + public GetPeersRequest(Address address, HashSet
peerAddresses) { + this.address = address; + this.peerAddresses = peerAddresses; + } + + @Override + public String toString() { + return "GetPeersMessage{" + + "address=" + address + + ", peerAddresses=" + peerAddresses + + '}'; + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersResponse.java new file mode 100644 index 0000000000..d45314df77 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/GetPeersResponse.java @@ -0,0 +1,27 @@ +package io.bitsquare.p2p.peers.messages.maintenance; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.Address; + +import java.util.HashSet; + +public final class GetPeersResponse implements MaintenanceMessage { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + + public final Address address; + public final HashSet
peerAddresses; + + public GetPeersResponse(Address address, HashSet
peerAddresses) { + this.address = address; + this.peerAddresses = peerAddresses; + } + + @Override + public String toString() { + return "GetPeersMessage{" + + "address=" + address + + ", peerAddresses=" + peerAddresses + + '}'; + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java similarity index 63% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java index e5455d4bba..328a367a5e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/MaintenanceMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.maintenance; import io.bitsquare.p2p.Message; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingMessage.java similarity index 90% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingMessage.java index 8da67acc79..628504f98a 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/PingMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.maintenance; import io.bitsquare.app.Version; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongMessage.java similarity index 90% rename from network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongMessage.java index a5ded72c8e..fdd34587ab 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/PongMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peer.messages; +package io.bitsquare.p2p.peers.messages.maintenance; import io.bitsquare.app.Version; diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index 48e7175035..f3f2620c3a 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -51,7 +51,7 @@ public class SeedNode { checkArgument(arg1.equals("true") || arg1.equals("false")); useLocalhost = ("true").equals(arg1); - if (args.length == 3) { + if (args.length > 2) { String arg2 = args[2]; checkArgument(arg2.contains(":") && arg2.split(":").length > 1 && arg2.split(":")[1].length() > 3, "Wrong program argument"); List list = Arrays.asList(arg2.split("|")); @@ -61,8 +61,8 @@ public class SeedNode { seedNodes.add(new Address(e)); }); seedNodes.remove(mySeedNodeAddress); - } else { - log.error("Wrong number of program arguments." + + } else if (args.length > 3) { + log.error("Too many program arguments." + "\nProgram arguments: myAddress useLocalhost seedNodes"); } } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index 11b23a9c3e..05b606db91 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -8,7 +8,7 @@ import io.bitsquare.common.crypto.Sig; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.IllegalRequest; import io.bitsquare.p2p.network.MessageListener; -import io.bitsquare.p2p.peer.PeerGroup; +import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.storage.data.*; import io.bitsquare.p2p.storage.messages.*; import io.bitsquare.storage.Storage; @@ -130,10 +130,10 @@ public class ProtectedExpirableDataStorage { log.trace("Data added to our map and it will be broadcasted to our peers."); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData))); - StringBuilder sb = new StringBuilder("\n\n----------------------------------------------------\n" + + StringBuilder sb = new StringBuilder("\n\n############################################################\n" + "Data set after addProtectedExpirableData:"); - map.values().stream().forEach(e -> sb.append("\n\n").append(e.toString())); - sb.append("\n----------------------------------------------------\n\n"); + map.values().stream().forEach(e -> sb.append("\n").append(e.toString())); + sb.append("\n############################################################\n"); log.trace(sb.toString()); if (!containsKey) @@ -247,9 +247,10 @@ public class ProtectedExpirableDataStorage { log.trace("Data removed from our map. We broadcast the message to our peers."); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData))); - StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n"); - map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n")); - sb.append("\n\n"); + StringBuilder sb = new StringBuilder("\n\n############################################################\n" + + "Data set after removeProtectedExpirableData:"); + map.values().stream().forEach(e -> sb.append("\n").append(e.toString())); + sb.append("\n############################################################\n"); log.trace(sb.toString()); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java similarity index 76% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java rename to network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java index 1371f36bb8..7b623f288b 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java @@ -3,10 +3,10 @@ package io.bitsquare.p2p.storage.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; -public final class GetAllDataMessage implements Message { +public final class GetDataRequest implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public GetAllDataMessage() { + public GetDataRequest() { } } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java similarity index 79% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java rename to network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java index 27f3c93d14..1601cc53a1 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java @@ -6,22 +6,22 @@ import io.bitsquare.p2p.storage.data.ProtectedData; import java.util.HashSet; -public final class AllDataMessage implements Message { +public final class GetDataResponse implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final HashSet set; - public AllDataMessage(HashSet set) { + public GetDataResponse(HashSet set) { this.set = set; } @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof AllDataMessage)) return false; + if (!(o instanceof GetDataResponse)) return false; - AllDataMessage that = (AllDataMessage) o; + GetDataResponse that = (GetDataResponse) o; return !(set != null ? !set.equals(that.set) : that.set != null); diff --git a/network/src/main/resources/logback.xml b/network/src/main/resources/logback.xml index 1454cf5289..cfc2861bb2 100644 --- a/network/src/main/resources/logback.xml +++ b/network/src/main/resources/logback.xml @@ -10,4 +10,7 @@ + + + diff --git a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java index 4db6dbafbf..fe29e5b2aa 100644 --- a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java +++ b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java @@ -8,7 +8,7 @@ import io.bitsquare.p2p.messaging.MailboxMessage; import io.bitsquare.p2p.messaging.SendMailboxMessageListener; import io.bitsquare.p2p.mocks.MockMailboxMessage; import io.bitsquare.p2p.network.LocalhostNetworkNode; -import io.bitsquare.p2p.peer.PeerGroup; +import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.seed.SeedNode; import io.bitsquare.p2p.storage.data.DataAndSeqNr; import io.bitsquare.p2p.storage.data.ProtectedData; diff --git a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java index 9c4053e614..d892391081 100644 --- a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java +++ b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java @@ -1,7 +1,7 @@ package io.bitsquare.p2p.network; import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.junit.Before; import org.junit.Ignore; @@ -77,8 +77,8 @@ public class LocalhostNetworkNodeTest { }); startupLatch.await(); - node2.sendMessage(new Address("localhost", 9001), new RequestAuthenticationMessage(new Address("localhost", 9002), 1)); - node1.sendMessage(new Address("localhost", 9002), new RequestAuthenticationMessage(new Address("localhost", 9001), 1)); + node2.sendMessage(new Address("localhost", 9001), new AuthenticationRequest(new Address("localhost", 9002), 1)); + node1.sendMessage(new Address("localhost", 9002), new AuthenticationRequest(new Address("localhost", 9001), 1)); msgLatch.await(); CountDownLatch shutDownLatch = new CountDownLatch(2); diff --git a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java index 97957d0024..34e015536a 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java @@ -6,8 +6,8 @@ import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.P2PServiceListener; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.LocalhostNetworkNode; -import io.bitsquare.p2p.peer.AuthenticationListener; -import io.bitsquare.p2p.peer.PeerGroup; +import io.bitsquare.p2p.peers.AuthenticationListener; +import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.seed.SeedNode; import org.junit.*; import org.slf4j.Logger; @@ -370,8 +370,8 @@ public class PeerGroupTest { // total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20 // total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46 for (int i = 0; i < length; i++) { - nodes[i].getP2PService().getPeerGroup().printConnectedPeersMap(); - nodes[i].getP2PService().getPeerGroup().printReportedPeersMap(); + nodes[i].getP2PService().getPeerGroup().printAuthenticatedPeers(); + nodes[i].getP2PService().getPeerGroup().printReportedPeers(); } CountDownLatch shutDownLatch = new CountDownLatch(length); diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index debf295ab8..9b2220607a 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -9,7 +9,7 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.TestUtils; import io.bitsquare.p2p.mocks.MockMessage; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peer.PeerGroup; +import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.storage.data.DataAndSeqNr; import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload; import io.bitsquare.p2p.storage.data.ProtectedData; diff --git a/seednode/src/main/resources/logback.xml b/seednode/src/main/resources/logback.xml index 152498c74f..94d5277b8e 100644 --- a/seednode/src/main/resources/logback.xml +++ b/seednode/src/main/resources/logback.xml @@ -23,9 +23,16 @@ - + + +