From 1f3c2c14794f6ac265f51504632952ed945c4cac Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 11 Nov 2015 00:50:56 +0100 Subject: [PATCH] Use ByteArray for map key, fix auth loops --- .../java/io/bitsquare/common/ByteArray.java | 32 +++++ .../settings/network/NetworkSettingsView.fxml | 1 - .../settings/network/NetworkSettingsView.java | 2 + .../main/java/io/bitsquare/p2p/Address.java | 10 +- .../java/io/bitsquare/p2p/P2PService.java | 49 ++++---- .../io/bitsquare/p2p/network/NetworkNode.java | 12 +- .../p2p/peers/AuthenticationHandshake.java | 33 +++--- .../java/io/bitsquare/p2p/peers/Peer.java | 1 + .../io/bitsquare/p2p/peers/PeerGroup.java | 110 +++++++++--------- .../messages/auth/AuthenticationRequest.java | 8 +- .../messages/auth/AuthenticationResponse.java | 8 +- .../messages/auth/GetPeersAuthRequest.java | 8 +- .../ProtectedExpirableDataStorage.java | 36 +++--- .../storage/messages/DataExchangeMessage.java | 48 ++++++++ 14 files changed, 227 insertions(+), 131 deletions(-) create mode 100644 common/src/main/java/io/bitsquare/common/ByteArray.java create mode 100644 network/src/main/java/io/bitsquare/p2p/storage/messages/DataExchangeMessage.java diff --git a/common/src/main/java/io/bitsquare/common/ByteArray.java b/common/src/main/java/io/bitsquare/common/ByteArray.java new file mode 100644 index 0000000000..9152618fbe --- /dev/null +++ b/common/src/main/java/io/bitsquare/common/ByteArray.java @@ -0,0 +1,32 @@ +package io.bitsquare.common; + +import io.bitsquare.app.Version; + +import java.io.Serializable; +import java.util.Arrays; + +public class ByteArray implements Serializable { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + + public final byte[] bytes; + + public ByteArray(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ByteArray)) return false; + + ByteArray byteArray = (ByteArray) o; + + return Arrays.equals(bytes, byteArray.bytes); + } + + @Override + public int hashCode() { + return bytes != null ? Arrays.hashCode(bytes) : 0; + } +} diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml index a1efeaba10..0ab577a778 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml @@ -52,7 +52,6 @@ - diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java index 366152f10c..3a67f37930 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java @@ -36,6 +36,7 @@ import javafx.beans.value.ChangeListener; import javafx.collections.FXCollections; import javafx.fxml.FXML; import javafx.geometry.Insets; +import javafx.geometry.VPos; import javafx.scene.control.ComboBox; import javafx.scene.control.Label; import javafx.scene.control.TextArea; @@ -84,6 +85,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel String.valueOf(walletService.numPeersProperty().get()), walletService .numPeersProperty())); diff --git a/network/src/main/java/io/bitsquare/p2p/Address.java b/network/src/main/java/io/bitsquare/p2p/Address.java index 0a0027606d..47ebc636f3 100644 --- a/network/src/main/java/io/bitsquare/p2p/Address.java +++ b/network/src/main/java/io/bitsquare/p2p/Address.java @@ -1,11 +1,14 @@ package io.bitsquare.p2p; +import io.bitsquare.common.crypto.Hash; + import java.io.Serializable; import java.util.regex.Pattern; public class Address implements Serializable { public final String hostName; public final int port; + transient private byte[] blurredAddress; public Address(String hostName, int port) { this.hostName = hostName; @@ -22,8 +25,11 @@ public class Address implements Serializable { return hostName + ":" + port; } - public String getAddressMask() { - return getFullAddress().substring(0, 2); + // We use just a few chars form or address to blur the potential receiver for sent messages + public byte[] getBlurredAddress() { + if (blurredAddress == null) + blurredAddress = Hash.getHash(getFullAddress().substring(0, 2)); + return blurredAddress; } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index a1d7470795..f874576861 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -7,9 +7,9 @@ import com.google.inject.Inject; import com.google.inject.name.Named; import io.bitsquare.app.Log; import io.bitsquare.app.ProgramArguments; +import io.bitsquare.common.ByteArray; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; -import io.bitsquare.common.crypto.Hash; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.PubKeyRing; import io.bitsquare.crypto.EncryptionService; @@ -26,6 +26,7 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload; import io.bitsquare.p2p.storage.data.ExpirablePayload; import io.bitsquare.p2p.storage.data.ProtectedData; import io.bitsquare.p2p.storage.data.ProtectedMailboxData; +import io.bitsquare.p2p.storage.messages.DataExchangeMessage; import io.bitsquare.p2p.storage.messages.GetDataRequest; import io.bitsquare.p2p.storage.messages.GetDataResponse; import javafx.beans.property.BooleanProperty; @@ -40,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.math.BigInteger; import java.security.PublicKey; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; @@ -83,8 +83,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final BooleanProperty authenticated = new SimpleBooleanProperty(); private MonadicBinding readyForAuthentication; public final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0); - @Nullable - private byte[] blurredAddressHash = null; /////////////////////////////////////////////////////////////////////////////////////////// @@ -167,20 +165,20 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis public void onMessage(Message message, Connection connection) { if (message instanceof GetDataRequest) { Log.traceCall(message.toString()); - log.info("Received GetDataSetMessage: " + message); networkNode.sendMessage(connection, new GetDataResponse(getDataSet())); } else if (message instanceof GetDataResponse) { Log.traceCall(message.toString()); GetDataResponse getDataResponse = (GetDataResponse) message; - log.info("Received GetDataResponse: " + message); HashSet set = getDataResponse.set; - if (!set.isEmpty()) { - // we keep that connection open as the bootstrapping peer will use that for the authentication - // as we are not authenticated yet the data adding will not be broadcasted - set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); - } else { - log.trace("Received DataSetMessage: Empty data set"); - } + // we keep that connection open as the bootstrapping peer will use that for the authentication + // as we are not authenticated yet the data adding will not be broadcasted + set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); + setRequestingDataCompleted(); + } else if (message instanceof DataExchangeMessage) { + Log.traceCall(message.toString()); + DataExchangeMessage dataExchangeMessage = (DataExchangeMessage) message; + HashSet set = dataExchangeMessage.set; + set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); setRequestingDataCompleted(); } else if (message instanceof SealedAndSignedMessage) { Log.traceCall(message.toString()); @@ -205,6 +203,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } private boolean verifyBlurredAddressHash(SealedAndSignedMessage sealedAndSignedMessage) { + byte[] blurredAddressHash = getAddress().getBlurredAddress(); return blurredAddressHash != null && Arrays.equals(blurredAddressHash, sealedAndSignedMessage.blurredAddressHash); } @@ -259,17 +258,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onPeerAdded(Peer peer) { - Log.traceCall(); } @Override public void onPeerRemoved(Address address) { - Log.traceCall(); } @Override public void onConnectionAuthenticated(Connection connection) { - Log.traceCall(); } @@ -292,8 +288,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Log.traceCall(); checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready"); - blurredAddressHash = Hash.getHash(getAddress().getAddressMask()); - p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished()); // 3. (or 2.). Step: Hidden service is published @@ -369,19 +363,20 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // 5. Step: private void sendGetAllDataMessageAfterAuthentication(final Peer peer) { - Log.traceCall(); - log.trace("sendGetDataSetMessageAfterAuthentication"); - // After authentication we request again data as we might have missed pushed data in the meantime - SettableFuture future = networkNode.sendMessage(peer.connection, new GetDataRequest()); + Log.traceCall(peer.toString()); + // We have to exchange the data again as we might have missed pushed data in the meantime + // After authentication we send our data set to the other peer. + // As he will do the same we will get his actual data set. + SettableFuture future = networkNode.sendMessage(peer.connection, new DataExchangeMessage(getDataSet())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " succeeded."); + log.info("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " succeeded."); } @Override public void onFailure(@NotNull Throwable throwable) { - log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " failed. " + + log.warn("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " failed. " + "Exception:" + throwable.getMessage()); } }); @@ -463,7 +458,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis if (encryptionService != null) { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - encryptionService.encryptAndSign(pubKeyRing, message), Hash.getHash(peerAddress.getAddressMask())); + encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getBlurredAddress()); SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); Futures.addCallback(future, new FutureCallback() { @Override @@ -509,7 +504,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis if (encryptionService != null) { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - encryptionService.encryptAndSign(peersPubKeyRing, message), Hash.getHash(peerAddress.getAddressMask())); + encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getBlurredAddress()); SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); Futures.addCallback(future, new FutureCallback() { @Override @@ -615,7 +610,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - public Map getDataMap() { + public Map getDataMap() { Log.traceCall(); return dataStorage.getMap(); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 05142ad57f..c5d2878381 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -87,6 +87,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener if (connection != null) { return sendMessage(connection, message); } else { + log.debug("inBoundConnections " + inBoundConnections.toString()); + log.debug("outBoundConnections " + outBoundConnections.toString()); log.trace("We have not found any connection for that peerAddress. " + "We will create a new outbound connection."); @@ -220,7 +222,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onConnection(Connection connection) { - Log.traceCall(); + Log.traceCall("NetworkNode connection=" + connection); connectionListeners.stream().forEach(e -> e.onConnection(connection)); } @@ -298,7 +300,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener startServerConnectionListener = new ConnectionListener() { @Override public void onConnection(Connection connection) { - Log.traceCall(); + Log.traceCall("startServerConnectionListener connection=" + connection); // we still have not authenticated so put it to the temp list inBoundConnections.add(connection); NetworkNode.this.onConnection(connection); @@ -333,13 +335,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } private Optional lookupOutboundConnection(Address peerAddress) { - Log.traceCall(); + Log.traceCall(peerAddress.toString()); + log.debug("outBoundConnections " + outBoundConnections); return outBoundConnections.stream() .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); } private Optional lookupInboundConnection(Address peerAddress) { - Log.traceCall(); + Log.traceCall(peerAddress.toString()); + log.debug("inBoundConnections " + inBoundConnections); return inBoundConnections.stream() .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index b019aa91cd..9bd80cad81 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -67,17 +67,18 @@ public class AuthenticationHandshake implements MessageListener { if (message instanceof AuthenticationResponse) { // Requesting peer AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; + connection.setPeerAddress(authenticationResponse.address); Address peerAddress = authenticationResponse.address; log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; if (verified) { - connection.setPeerAddress(peerAddress); SettableFuture future = networkNode.sendMessage(peerAddress, - new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); + new GetPeersAuthRequest(myAddress, authenticationResponse.responderNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.trace("GetPeersAuthRequest sent successfully from " + myAddress + " to " + peerAddress); + connection.setPeerAddress(peerAddress); } @Override @@ -95,21 +96,29 @@ public class AuthenticationHandshake implements MessageListener { GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; Address peerAddress = getPeersAuthRequest.address; log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); - boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce; + boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce; if (verified) { - // we add the reported peers to our own set - HashSet
peerAddresses = getPeersAuthRequest.peerAddresses; - log.trace("Received peers: " + peerAddresses); - peerGroup.addToReportedPeers(peerAddresses, connection); - + // we create the msg with our already collected peer addresses (before adding the new ones) SettableFuture future = networkNode.sendMessage(peerAddress, new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); log.trace("sent GetPeersAuthResponse to " + peerAddress + " from " + myAddress + " with allPeers=" + peerGroup.getAllPeerAddresses()); + + // now we add the reported peers to our own set + HashSet
peerAddresses = getPeersAuthRequest.peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.trace("GetPeersAuthResponse sent successfully from " + myAddress + " to " + peerAddress); + connection.setPeerAddress(peerAddress); + log.info("AuthenticationComplete: Peer with address " + peerAddress + + " authenticated (" + connection.getUid() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms."); + + AuthenticationHandshake.this.onSuccess(connection); } @Override @@ -118,12 +127,6 @@ public class AuthenticationHandshake implements MessageListener { onFault(throwable); } }); - - log.info("AuthenticationComplete: Peer with address " + peerAddress - + " authenticated (" + connection.getUid() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms."); - - onSuccess(connection); } else { log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce); onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce)); @@ -219,7 +222,7 @@ public class AuthenticationHandshake implements MessageListener { // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce())); + SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.requesterNonce, getAndSetNonce())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java index e2655ecf64..f71fe045e8 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java @@ -55,6 +55,7 @@ public class Peer implements Serializable { return "Peer{" + "address=" + address + ", pingNonce=" + pingNonce + + ", connection=" + connection + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index 054e57690a..098afec44f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -6,7 +6,6 @@ import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Tuple2; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; @@ -44,10 +43,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { MAX_CONNECTIONS = maxConnections; } - private static final int SEND_PING_INTERVAL = new Random().nextInt(5 * 60 * 1000) + 5 * 60 * 1000; private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; - private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min. - private static final int RETRY_FILL_AUTH_PEERS = GET_PEERS_INTERVAL + 5000; private static final int MAX_REPORTED_PEERS = 1000; private final NetworkNode networkNode; @@ -58,8 +54,8 @@ public class PeerGroup implements MessageListener, ConnectionListener { private final Set
reportedPeerAddresses = new HashSet<>(); private final Map authenticationHandshakes = new ConcurrentHashMap<>(); - private final Timer sendPingTimer = new Timer(); - private final Timer getPeersTimer = new Timer(); + private Timer sendPingTimer = new Timer(); + private Timer getPeersTimer = new Timer(); private boolean shutDownInProgress; private boolean firstPeerAdded = false; @@ -78,7 +74,8 @@ public class PeerGroup implements MessageListener, ConnectionListener { networkNode.addMessageListener(this); networkNode.addConnectionListener(this); - setupMaintenanceTimer(); + startMaintenanceTimer(); + startGetPeersTimer(); } @@ -215,6 +212,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { "We have that peer already authenticated. That must never happen."); if (!authenticationHandshakes.containsKey(peerAddress)) { AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + authenticationHandshakes.put(peerAddress, authenticationHandshake); SettableFuture future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress); Futures.addCallback(future, new FutureCallback() { @Override @@ -224,15 +222,15 @@ public class PeerGroup implements MessageListener, ConnectionListener { if (continueOnSuccess) { if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { log.info("We still don't have enough connections. Lets try the reported peers."); - authenticateToRemainingReportedPeers(); + authenticateToRemainingReportedPeers(true); } else { log.info("We have already enough connections."); } } else { log.info("We have already tried all reported peers and seed nodes. " + "We stop bootstrapping now, but will repeat after an while."); - UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), - RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true), + 1, 2, TimeUnit.MINUTES); } } } @@ -255,12 +253,12 @@ public class PeerGroup implements MessageListener, ConnectionListener { authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true); } else if (reportedPeerAddresses.size() > 0) { log.info("We don't have any more seed nodes for connecting. Lets try the reported peers."); - authenticateToRemainingReportedPeers(); + authenticateToRemainingReportedPeers(true); } else { log.info("We don't have any more seed nodes or reported nodes for connecting. " + "We stop bootstrapping now, but will repeat after an while."); - UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), - RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true), + 1, 2, TimeUnit.MINUTES); } } }); @@ -269,12 +267,17 @@ public class PeerGroup implements MessageListener, ConnectionListener { } } - private void authenticateToRemainingReportedPeers() { + private void authenticateToRemainingReportedPeers(boolean calledFromSeedNodeMethod) { Log.traceCall(); Optional>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses); if (tupleOptional.isPresent()) { log.info("We try to authenticate to a random peer. " + tupleOptional.get().first); authenticateToReportedPeer(tupleOptional.get().second, tupleOptional.get().first); + } else if (calledFromSeedNodeMethod) { + log.info("We don't have any reported peers for connecting. " + + "As we tried recently the seed nodes we will wait a bit before repeating."); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(), + 1, 2, TimeUnit.MINUTES); } else { log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes."); authenticateToRemainingSeedNodes(); @@ -289,6 +292,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { "We have that peer already authenticated. That must never happen."); if (!authenticationHandshakes.containsKey(peerAddress)) { AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + authenticationHandshakes.put(peerAddress, authenticationHandshake); SettableFuture future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress); Futures.addCallback(future, new FutureCallback() { @Override @@ -299,10 +303,12 @@ public class PeerGroup implements MessageListener, ConnectionListener { if (reportedPeerAddresses.size() > 0) { log.info("We still don't have enough connections. " + "Lets try the remaining reported peer addresses."); - authenticateToRemainingReportedPeers(); + authenticateToRemainingReportedPeers(false); } else { - log.info("We still don't have enough connections. Lets try the remaining seed nodes."); - authenticateToRemainingSeedNodes(); + log.info("We still don't have enough connections. " + + "Lets wait a bit and then try the remaining seed nodes."); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(), + 1, 2, TimeUnit.MINUTES); } } else { log.info("We have already enough connections."); @@ -316,8 +322,17 @@ public class PeerGroup implements MessageListener, ConnectionListener { throwable.printStackTrace(); removePeer(peerAddress); - log.info("Authentication failed. Lets try again with the remaining reported peer addresses."); - authenticateToRemainingReportedPeers(); + authenticateToRemainingReportedPeers(false); + + if (reportedPeerAddresses.size() > 0) { + log.info("Authentication failed. Lets try again with the remaining reported peer addresses."); + authenticateToRemainingReportedPeers(false); + } else { + log.info("Authentication failed. " + + "Lets wait a bit and then try the remaining seed nodes."); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(), + 1, 2, TimeUnit.MINUTES); + } } }); } else { @@ -334,8 +349,8 @@ public class PeerGroup implements MessageListener, ConnectionListener { } else { log.info("We don't have any more seed nodes for connecting. " + "We stop bootstrapping now, but will repeat after an while."); - UserThread.runAfter(() -> authenticateToRemainingReportedPeers(), - RETRY_FILL_AUTH_PEERS, TimeUnit.MILLISECONDS); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true), + 1, 2, TimeUnit.MINUTES); } } @@ -350,6 +365,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { "We have that seed node already authenticated. That must never happen."); if (!authenticationHandshakes.containsKey(peerAddress)) { AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + authenticationHandshakes.put(peerAddress, authenticationHandshake); SettableFuture future = authenticationHandshake.requestAuthenticationToPeer(peerAddress); Futures.addCallback(future, new FutureCallback() { @Override @@ -377,7 +393,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { private void setAuthenticated(Connection connection, Address peerAddress) { Log.traceCall(peerAddress.getFullAddress()); - if (!authenticationHandshakes.containsKey(peerAddress)) + if (authenticationHandshakes.containsKey(peerAddress)) authenticationHandshakes.remove(peerAddress); log.info("\n\n############################################################\n" + "We are authenticated to:" + @@ -412,38 +428,28 @@ public class PeerGroup implements MessageListener, ConnectionListener { // Maintenance /////////////////////////////////////////////////////////////////////////////////////////// - private void setupMaintenanceTimer() { + private void startMaintenanceTimer() { Log.traceCall(); - sendPingTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - Utilities.setThreadName("MaintenanceTimer"); - try { - UserThread.execute(() -> { - checkIfConnectedPeersExceeds(); - pingPeers(); - }); - } catch (Throwable t) { - log.error("Executing task failed. " + t.getMessage()); - t.printStackTrace(); - } - } - }, SEND_PING_INTERVAL, SEND_PING_INTERVAL); + if (sendPingTimer != null) + sendPingTimer.cancel(); - getPeersTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - Utilities.setThreadName("GetPeersTimer"); - try { - UserThread.execute(() -> trySendGetPeersRequest()); - } catch (Throwable t) { - log.error("Executing task failed. " + t.getMessage()); - t.printStackTrace(); - } - } - }, GET_PEERS_INTERVAL, GET_PEERS_INTERVAL); + sendPingTimer = UserThread.runAfterRandomDelay(() -> { + checkIfConnectedPeersExceeds(); + pingPeers(); + startMaintenanceTimer(); + }, 5, 10, TimeUnit.MINUTES); } + private void startGetPeersTimer() { + Log.traceCall(); + if (getPeersTimer != null) + getPeersTimer.cancel(); + + getPeersTimer = UserThread.runAfterRandomDelay(() -> { + trySendGetPeersRequest(); + startGetPeersTimer(); + }, 1, 2, TimeUnit.MINUTES); + } private boolean checkIfConnectedPeersExceeds() { Log.traceCall(); @@ -484,7 +490,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { removePeer(e.address); } }); - }, 5, 10)); + }, 1, 10)); } private void trySendGetPeersRequest() { @@ -659,7 +665,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { private void removePeer(@Nullable Address peerAddress) { Log.traceCall("peerAddress=" + peerAddress); if (peerAddress != null) { - if (!authenticationHandshakes.containsKey(peerAddress)) + if (authenticationHandshakes.containsKey(peerAddress)) authenticationHandshakes.remove(peerAddress); boolean contained = reportedPeerAddresses.remove(peerAddress); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java index 6547246cd9..7133d39eb5 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java @@ -8,18 +8,18 @@ public final class AuthenticationRequest extends AuthenticationMessage { private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; - public final long nonce; + public final long requesterNonce; - public AuthenticationRequest(Address address, long nonce) { + public AuthenticationRequest(Address address, long requesterNonce) { this.address = address; - this.nonce = nonce; + this.requesterNonce = requesterNonce; } @Override public String toString() { return "AuthenticationRequest{" + "address=" + address + - ", nonce=" + nonce + + ", nonce=" + requesterNonce + "} " + super.toString(); } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java index 969ff87206..49d1557153 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java @@ -9,12 +9,12 @@ public final class AuthenticationResponse extends AuthenticationMessage { public final Address address; public final long requesterNonce; - public final long challengerNonce; + public final long responderNonce; - public AuthenticationResponse(Address address, long requesterNonce, long challengerNonce) { + public AuthenticationResponse(Address address, long requesterNonce, long responderNonce) { this.address = address; this.requesterNonce = requesterNonce; - this.challengerNonce = challengerNonce; + this.responderNonce = responderNonce; } @Override @@ -22,7 +22,7 @@ public final class AuthenticationResponse extends AuthenticationMessage { return "AuthenticationResponse{" + "address=" + address + ", requesterNonce=" + requesterNonce + - ", challengerNonce=" + challengerNonce + + ", challengerNonce=" + responderNonce + "} " + super.toString(); } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java index ca294449f6..8f4ca8535e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java @@ -10,12 +10,12 @@ public final class GetPeersAuthRequest extends AuthenticationMessage { private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; - public final long challengerNonce; + public final long responderNonce; public final HashSet
peerAddresses; - public GetPeersAuthRequest(Address address, long challengerNonce, HashSet
peerAddresses) { + public GetPeersAuthRequest(Address address, long responderNonce, HashSet
peerAddresses) { this.address = address; - this.challengerNonce = challengerNonce; + this.responderNonce = responderNonce; this.peerAddresses = peerAddresses; } @@ -23,7 +23,7 @@ public final class GetPeersAuthRequest extends AuthenticationMessage { public String toString() { return "GetPeersAuthRequest{" + "address=" + address + - ", challengerNonce=" + challengerNonce + + ", challengerNonce=" + responderNonce + ", peerAddresses=" + peerAddresses + "} " + super.toString(); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index b731438ac8..a0fdcc413e 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -2,6 +2,7 @@ package io.bitsquare.p2p.storage; import com.google.common.annotations.VisibleForTesting; import io.bitsquare.app.Log; +import io.bitsquare.common.ByteArray; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.Hash; @@ -25,7 +26,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.File; -import java.math.BigInteger; import java.security.KeyPair; import java.security.PublicKey; import java.util.HashMap; @@ -42,9 +42,9 @@ public class ProtectedExpirableDataStorage implements MessageListener { public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000; private final PeerGroup peerGroup; - private final Map map = new HashMap<>(); + private final Map map = new HashMap<>(); private final CopyOnWriteArraySet hashMapChangedListeners = new CopyOnWriteArraySet<>(); - private HashMap sequenceNumberMap = new HashMap<>(); + private HashMap sequenceNumberMap = new HashMap<>(); private final Storage storage; private final Timer timer = new Timer(); private volatile boolean shutDownInProgress; @@ -65,7 +65,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { private void init() { Log.traceCall(); - HashMap persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap"); + HashMap persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap"); if (persisted != null) { sequenceNumberMap = persisted; } @@ -137,7 +137,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { public boolean add(ProtectedData protectedData, @Nullable Address sender) { Log.traceCall(); - BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload); + ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirablePayload); boolean containsKey = map.containsKey(hashOfPayload); boolean result = checkPublicKeys(protectedData, true) && checkSignature(protectedData); @@ -171,7 +171,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { public boolean remove(ProtectedData protectedData, @Nullable Address sender) { Log.traceCall(); - BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload); + ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirablePayload); boolean containsKey = map.containsKey(hashOfPayload); if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); boolean result = containsKey @@ -196,7 +196,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { public boolean removeMailboxData(ProtectedMailboxData protectedMailboxData, @Nullable Address sender) { Log.traceCall(); - BigInteger hashOfData = getHashAsBigInteger(protectedMailboxData.expirablePayload); + ByteArray hashOfData = getHashAsByteArray(protectedMailboxData.expirablePayload); boolean containsKey = map.containsKey(hashOfData); if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); boolean result = containsKey @@ -219,14 +219,14 @@ public class ProtectedExpirableDataStorage implements MessageListener { return result; } - public Map getMap() { + public Map getMap() { return map; } public ProtectedData getDataWithSignedSeqNr(ExpirablePayload payload, KeyPair ownerStoragePubKey) throws CryptoException { Log.traceCall(); - BigInteger hashOfData = getHashAsBigInteger(payload); + ByteArray hashOfData = getHashAsByteArray(payload); int sequenceNumber; if (sequenceNumberMap.containsKey(hashOfData)) sequenceNumber = sequenceNumberMap.get(hashOfData) + 1; @@ -242,7 +242,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { KeyPair storageSignaturePubKey, PublicKey receiversPublicKey) throws CryptoException { Log.traceCall(); - BigInteger hashOfData = getHashAsBigInteger(expirableMailboxPayload); + ByteArray hashOfData = getHashAsByteArray(expirableMailboxPayload); int sequenceNumber; if (sequenceNumberMap.containsKey(hashOfData)) sequenceNumber = sequenceNumberMap.get(hashOfData) + 1; @@ -265,7 +265,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) { + private void doRemoveProtectedExpirableData(ProtectedData protectedData, ByteArray hashOfPayload) { Log.traceCall(); map.remove(hashOfPayload); log.trace("Data removed from our map. We broadcast the message to our peers."); @@ -278,7 +278,7 @@ public class ProtectedExpirableDataStorage implements MessageListener { log.info(sb.toString()); } - private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) { + private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) { Log.traceCall(); int newSequenceNumber = data.sequenceNumber; Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData); @@ -325,10 +325,10 @@ public class ProtectedExpirableDataStorage implements MessageListener { return result; } - private boolean checkIfStoredDataMatchesNewData(ProtectedData data, BigInteger hashOfData) { + private boolean checkIfStoredDataMatchesNewData(ProtectedData data, ByteArray hashOfData) { Log.traceCall(); ProtectedData storedData = map.get(hashOfData); - boolean result = getHashAsBigInteger(storedData.expirablePayload).equals(hashOfData) + boolean result = getHashAsByteArray(storedData.expirablePayload).equals(hashOfData) && storedData.ownerStoragePubKey.equals(data.ownerStoragePubKey); if (!result) log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud"); @@ -336,14 +336,14 @@ public class ProtectedExpirableDataStorage implements MessageListener { return result; } - private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, BigInteger hashOfData) { + private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, ByteArray hashOfData) { Log.traceCall(); ProtectedData storedData = map.get(hashOfData); if (storedData instanceof ProtectedMailboxData) { ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData; // publicKey is not the same (stored: sender, new: receiver) boolean result = storedMailboxData.receiversPubKey.equals(data.receiversPubKey) - && getHashAsBigInteger(storedMailboxData.expirablePayload).equals(hashOfData); + && getHashAsByteArray(storedMailboxData.expirablePayload).equals(hashOfData); if (!result) log.error("New data entry does not match our stored data. Consider it might be an attempt of fraud"); @@ -359,8 +359,8 @@ public class ProtectedExpirableDataStorage implements MessageListener { peerGroup.broadcast(message, sender); } - private BigInteger getHashAsBigInteger(ExpirablePayload payload) { - return new BigInteger(Hash.getHash(payload)); + private ByteArray getHashAsByteArray(ExpirablePayload payload) { + return new ByteArray(Hash.getHash(payload)); } } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/DataExchangeMessage.java b/network/src/main/java/io/bitsquare/p2p/storage/messages/DataExchangeMessage.java new file mode 100644 index 0000000000..9001558a98 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/storage/messages/DataExchangeMessage.java @@ -0,0 +1,48 @@ +package io.bitsquare.p2p.storage.messages; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.storage.data.ProtectedData; + +import java.util.HashSet; + +public final class DataExchangeMessage implements Message { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + private final int networkId = Version.NETWORK_ID; + + public final HashSet set; + + public DataExchangeMessage(HashSet set) { + this.set = set; + } + + @Override + public int networkId() { + return networkId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof DataExchangeMessage)) return false; + + DataExchangeMessage that = (DataExchangeMessage) o; + + return !(set != null ? !set.equals(that.set) : that.set != null); + + } + + @Override + public int hashCode() { + return set != null ? set.hashCode() : 0; + } + + @Override + public String toString() { + return "GetDataResponse{" + + "networkId=" + networkId + + ", set=" + set + + '}'; + } +}