From cd631eb53b3ceaca246f3968aa2940ab99571301 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 24 Dec 2015 02:29:46 +0100 Subject: [PATCH] Refactor authentication --- .../java/io/bitsquare/p2p/P2PService.java | 21 +- .../io/bitsquare/p2p/network/Connection.java | 67 +++--- .../p2p/network/LocalhostNetworkNode.java | 4 +- .../io/bitsquare/p2p/network/NetworkNode.java | 40 ++-- .../p2p/peers/AuthenticationHandshake.java | 146 +++++------- .../p2p/peers/AuthenticationListener.java | 2 +- .../p2p/peers/MaintenanceManager.java | 15 +- .../java/io/bitsquare/p2p/peers/Peer.java | 4 +- .../p2p/peers/PeerExchangeManager.java | 13 +- .../io/bitsquare/p2p/peers/PeerGroup.java | 222 ++++++++++-------- .../p2p/peers/RequestDataManager.java | 89 ++++--- ...uest.java => AuthenticationChallenge.java} | 14 +- .../messages/auth/AuthenticationMessage.java | 8 +- .../auth/AuthenticationRejection.java | 10 +- .../messages/auth/AuthenticationRequest.java | 8 +- .../messages/auth/AuthenticationResponse.java | 17 +- .../messages/auth/GetPeersAuthResponse.java | 27 --- .../messages/data/DataRequest.java} | 8 +- .../messages/data/DataResponse.java} | 10 +- .../GetPeersRequest.java | 10 +- .../GetPeersResponse.java | 2 +- .../PeerExchangeMessage.java | 2 +- .../bitsquare/p2p/storage/P2PDataStorage.java | 19 +- 23 files changed, 379 insertions(+), 379 deletions(-) rename network/src/main/java/io/bitsquare/p2p/peers/messages/auth/{GetPeersAuthRequest.java => AuthenticationChallenge.java} (59%) delete mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java rename network/src/main/java/io/bitsquare/p2p/{storage/messages/GetDataRequest.java => peers/messages/data/DataRequest.java} (80%) rename network/src/main/java/io/bitsquare/p2p/{storage/messages/GetDataResponse.java => peers/messages/data/DataResponse.java} (78%) rename network/src/main/java/io/bitsquare/p2p/peers/messages/{peerexchange => peers}/GetPeersRequest.java (71%) rename network/src/main/java/io/bitsquare/p2p/peers/messages/{peerexchange => peers}/GetPeersResponse.java (92%) rename network/src/main/java/io/bitsquare/p2p/peers/messages/{peerexchange => peers}/PeerExchangeMessage.java (87%) diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 9313039ba1..2d697d5b52 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -134,14 +134,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis peerGroup = new PeerGroup(networkNode); peerGroup.addAuthenticationListener(this); if (useLocalhost) - PeerGroup.setSimulateAuthTorNode(200); + PeerGroup.setSimulateAuthTorNode(100); // P2P network data storage dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir); dataStorage.addHashMapChangedListener(this); // Request initial data manager - requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() { + requestDataManager = new RequestDataManager(networkNode, dataStorage, peerGroup, new RequestDataManager.Listener() { @Override public void onNoSeedNodeAvailable() { p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); @@ -286,9 +286,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onDisconnect(Reason reason, Connection connection) { Log.traceCall(); - if (connection.isAuthenticated()) - authenticatedPeerAddresses.remove(connection.getPeerAddress()); - + connection.getPeerAddress().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses)); numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); } @@ -302,10 +300,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + public void onPeerAuthenticated(Address peerAddress, Connection connection) { Log.traceCall(); - checkArgument(peerAddress.equals(connection.getPeerAddress()), - "peerAddress must match connection.getPeerAddress()"); authenticatedPeerAddresses.add(peerAddress); if (!firstPeerAuthenticated.get()) { @@ -315,8 +311,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); } - - + + /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -339,8 +335,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG); log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey); - decryptedMailListeners.stream().forEach( - e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())); + connection.getPeerAddress().ifPresent(peerAddresses -> + decryptedMailListeners.stream().forEach( + e -> e.onMailMessage(decryptedMsgWithPubKey, peerAddresses))); } else { log.info("Wrong receiverAddressMaskHash. The message is not intended for us."); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 152d3c37eb..3499eb2584 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -19,9 +19,12 @@ import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Date; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.*; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Connection is created by the server thread or by sendMessage from NetworkNode. * All handlers are called on User thread. @@ -31,7 +34,7 @@ public class Connection implements MessageListener { private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data //timeout on blocking Socket operations like ServerSocket.accept() or SocketInputStream.read() - private static final int SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 min. + private static final int SOCKET_TIMEOUT = 10 * 60 * 1000; // 10 min. private ConnectionPriority connectionPriority; public static int getMaxMsgSize() { @@ -41,6 +44,7 @@ public class Connection implements MessageListener { private final Socket socket; private final MessageListener messageListener; private final ConnectionListener connectionListener; + private final String portInfo; private final String uid = UUID.randomUUID().toString(); private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); @@ -52,8 +56,7 @@ public class Connection implements MessageListener { private ObjectOutputStream objectOutputStream; // mutable data, set from other threads but not changed internally. - @Nullable - private Address peerAddress; + private Optional
peerAddressOptional = Optional.empty(); private volatile boolean isAuthenticated; private volatile boolean stopped; @@ -116,9 +119,8 @@ public class Connection implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// // Called form UserThread - public void setAuthenticated(Address peerAddress) { + public void setAuthenticated() { Log.traceCall(); - this.peerAddress = peerAddress; isAuthenticated = true; } @@ -131,9 +133,10 @@ public class Connection implements MessageListener { Log.traceCall(); if (!stopped) { try { + String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null"; log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + "Write object to outputStream to peer: {} (uid={})\nmessage={}" - + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", getPeerAddress(), uid, message); + + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", peerAddress, uid, message); Object objectToWrite; if (useCompression) { @@ -167,9 +170,10 @@ public class Connection implements MessageListener { sharedSpace.reportIllegalRequest(illegalRequest); } - public synchronized void setPeerAddress(@Nullable Address peerAddress) { + public synchronized void setPeerAddress(Address peerAddress) { Log.traceCall(); - this.peerAddress = peerAddress; + checkNotNull(peerAddress, "peerAddress must not be null"); + peerAddressOptional = Optional.of(peerAddress); } @@ -189,8 +193,12 @@ public class Connection implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// @Nullable - public synchronized Address getPeerAddress() { - return peerAddress; + public synchronized Address getPeerAddress1() { + return peerAddressOptional.isPresent() ? peerAddressOptional.get() : null; + } + + public synchronized Optional
getPeerAddress() { + return peerAddressOptional; } public Date getLastActivityDate() { @@ -213,6 +221,7 @@ public class Connection implements MessageListener { return connectionPriority; } + /////////////////////////////////////////////////////////////////////////////////////////// // ShutDown /////////////////////////////////////////////////////////////////////////////////////////// @@ -232,6 +241,7 @@ public class Connection implements MessageListener { private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { Log.traceCall(this.toString()); if (!stopped) { + String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null"; log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "ShutDown connection:" + "\npeerAddress=" + peerAddress @@ -249,31 +259,32 @@ public class Connection implements MessageListener { Log.traceCall("sendCloseConnectionMessage"); try { sendMessage(new CloseConnectionMessage()); - stopped = true; - sharedSpace.stop(); - if (inputHandler != null) - inputHandler.stop(); + setStopFlags(); - // TODO increase delay - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } catch (Throwable t) { log.error(t.getMessage()); t.printStackTrace(); } finally { - UserThread.execute(() -> continueShutDown(shutDownCompleteHandler)); + UserThread.execute(() -> doShutDown(shutDownCompleteHandler)); } }).start(); } else { - stopped = true; - sharedSpace.stop(); - if (inputHandler != null) - inputHandler.stop(); - continueShutDown(shutDownCompleteHandler); + setStopFlags(); + doShutDown(shutDownCompleteHandler); } } } - private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) { + private void setStopFlags() { + stopped = true; + sharedSpace.stop(); + if (inputHandler != null) + inputHandler.stop(); + isAuthenticated = false; + } + + private void doShutDown(@Nullable Runnable shutDownCompleteHandler) { Log.traceCall(); ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason(); if (shutDownReason == null) @@ -309,7 +320,7 @@ public class Connection implements MessageListener { if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false; if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false; - return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null); + return peerAddressOptional != null ? peerAddressOptional.equals(that.peerAddressOptional) : that.peerAddressOptional == null; } @@ -317,7 +328,7 @@ public class Connection implements MessageListener { public int hashCode() { int result = portInfo != null ? portInfo.hashCode() : 0; result = 31 * result + (uid != null ? uid.hashCode() : 0); - result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0); + result = 31 * result + (peerAddressOptional != null ? peerAddressOptional.hashCode() : 0); return result; } @@ -327,7 +338,7 @@ public class Connection implements MessageListener { "portInfo=" + portInfo + ", uid='" + uid + '\'' + ", sharedSpace=" + sharedSpace.toString() + - ", peerAddress=" + peerAddress + + ", peerAddress=" + peerAddressOptional + ", isAuthenticated=" + isAuthenticated + ", stopped=" + stopped + ", stopped=" + stopped + @@ -398,6 +409,7 @@ public class Connection implements MessageListener { public void handleConnectionException(Exception e) { Log.traceCall(e.toString()); + log.debug("connection=" + this); if (e instanceof SocketException) { if (socket.isClosed()) shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; @@ -405,11 +417,12 @@ public class Connection implements MessageListener { shutDownReason = ConnectionListener.Reason.RESET; } else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) { shutDownReason = ConnectionListener.Reason.TIMEOUT; + log.warn("TimeoutException at connection with port " + socket.getLocalPort()); } else if (e instanceof EOFException) { shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED; } else { shutDownReason = ConnectionListener.Reason.UNKNOWN; - log.info("Exception at connection with port " + socket.getLocalPort()); + log.warn("Exception at connection with port " + socket.getLocalPort()); e.printStackTrace(); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index 46576bda4c..d80d45b557 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -27,8 +27,8 @@ import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class); - private static volatile int simulateTorDelayTorNode = 600; - private static volatile int simulateTorDelayHiddenService = 3000; + private static volatile int simulateTorDelayTorNode = 100; + private static volatile int simulateTorDelayHiddenService = 500; private Address address; public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index f7daef37a5..d6b65f717d 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -15,7 +15,9 @@ import java.io.IOException; import java.net.ConnectException; import java.net.ServerSocket; import java.net.Socket; -import java.util.*; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeoutException; @@ -62,7 +64,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener abstract public void start(@Nullable SetupListener setupListener); public SettableFuture sendMessage(@NotNull Address peerAddress, Message message) { - Log.traceCall("message: " + message + " to peerAddress: " + peerAddress); + Log.traceCall("peerAddress: " + peerAddress + " / message: " + message); checkNotNull(peerAddress, "peerAddress must not be null"); Optional outboundConnectionOptional = lookupOutboundConnection(peerAddress); @@ -86,8 +88,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener if (connection != null) { return sendMessage(connection, message); } else { - log.trace("We have not found any connection for that peerAddress. " + - "We will create a new outbound connection."); + log.trace("We have not found any connection for peerAddress {}. " + + "We will create a new outbound connection.", peerAddress); final SettableFuture resultFuture = SettableFuture.create(); final boolean[] timeoutOccurred = new boolean[1]; @@ -96,7 +98,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress); try { // can take a while when using tor - Socket socket = createSocket(peerAddress); + Socket socket = createSocket(peerAddress); if (timeoutOccurred[0]) throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress); @@ -114,7 +116,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // can take a while when using tor newConnection.sendMessage(message); - return newConnection; + return newConnection; } catch (Throwable throwable) { if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { throwable.printStackTrace(); @@ -124,7 +126,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } }); - Timer timer = new Timer(); + //TODO does not close the connection yet. not clear if socket timeout is enough. + /*Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { @@ -135,19 +138,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.info(message); UserThread.execute(() -> resultFuture.setException(new TimeoutException(message))); } - }, CREATE_SOCKET_TIMEOUT); + }, CREATE_SOCKET_TIMEOUT);*/ Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { UserThread.execute(() -> { - timer.cancel(); + //timer.cancel(); resultFuture.set(connection); }); } public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> { - timer.cancel(); + //timer.cancel(); resultFuture.setException(throwable); }); } @@ -158,7 +161,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } public SettableFuture sendMessage(Connection connection, Message message) { - Log.traceCall(); + Log.traceCall("message: " + message + " to connection: " + connection); // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block ListenableFuture future = executorService.submit(() -> { Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid()); @@ -223,15 +226,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onConnection(Connection connection) { - Log.traceCall("NetworkNode connection=" + connection); + Log.traceCall("connection=" + connection); connectionListeners.stream().forEach(e -> e.onConnection(connection)); } @Override public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - Address peerAddress = connection.getPeerAddress(); - log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress); + Log.traceCall("connection = " + connection); outBoundConnections.remove(connection); inBoundConnections.remove(connection); connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); @@ -311,10 +312,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - Address peerAddress = connection.getPeerAddress(); - log.trace("onDisconnect at incoming connection to peerAddress (or connection) " - + ((peerAddress == null) ? connection : peerAddress)); + Log.traceCall("onDisconnect at incoming connection = " + connection); inBoundConnections.remove(connection); NetworkNode.this.onDisconnect(reason, connection); } @@ -334,13 +332,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private Optional lookupOutboundConnection(Address peerAddress) { Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections); return outBoundConnections.stream() - .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); + .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny(); } private Optional lookupInboundConnection(Address peerAddress) { Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections); return inBoundConnections.stream() - .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); + .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny(); } abstract protected Socket createSocket(Address peerAddress) throws IOException; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index ad5e45788c..616e239bc2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -11,7 +11,10 @@ import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.ConnectionPriority; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peers.messages.auth.*; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationChallenge; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationMessage; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest; +import io.bitsquare.p2p.peers.messages.auth.AuthenticationResponse; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,14 +24,13 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.TimeUnit; -// authentication protocol: -// node2 -> node1 AuthenticationRequest -// node1: close connection -// node1 -> node2 AuthenticationResponse on new connection -// node2: authentication to node1 done if nonce ok -// node2 -> node1 GetPeersAuthRequest -// node1: authentication to node2 done if nonce ok -// node1 -> node2 GetPeersAuthResponse +// Authentication protocol: +// client: send AuthenticationRequest to seedNode +// seedNode: close connection +// seedNode: send AuthenticationChallenge to client on a new connection to test if address is correct +// client: authentication to seedNode done if nonce verification is ok +// client: AuthenticationResponse to seedNode +// seedNode: authentication to client done if nonce verification is ok public class AuthenticationHandshake implements MessageListener { private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class); @@ -76,28 +78,33 @@ public class AuthenticationHandshake implements MessageListener { if (message instanceof AuthenticationMessage) { // We are listening on all connections, so we need to filter out only our peer - if (((AuthenticationMessage) message).address.equals(peerAddress)) { + if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) { Log.traceCall(message.toString()); - if (message instanceof AuthenticationResponse) { + if (message instanceof AuthenticationChallenge) { // Requesting peer + AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message; + // We need to set the address to the connection, otherwise we will not find the connection when sending + // the next message and we would create a new outbound connection instead using the inbound. + connection.setPeerAddress(authenticationChallenge.senderAddress); // We use the active connectionType if we started the authentication request to another peer - // That is used for protecting eclipse attacks connection.setConnectionPriority(ConnectionPriority.ACTIVE); - - AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; - connection.setPeerAddress(peerAddress); log.trace("Received authenticationResponse from " + peerAddress); - boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; + boolean verified = nonce != 0 && nonce == authenticationChallenge.requesterNonce; if (verified) { - GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress, - authenticationResponse.responderNonce, + AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress, + authenticationChallenge.responderNonce, new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())); - SettableFuture future = networkNode.sendMessage(peerAddress, getPeersAuthRequest); - log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, authenticationResponse); + log.trace("Sent GetPeersAuthRequest {} to {}", authenticationResponse, peerAddress); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Successfully sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress); + log.trace("Successfully sent GetPeersAuthRequest to {}", peerAddress); + + log.info("AuthenticationComplete: Peer with address " + peerAddress + + " authenticated (" + connection.getUid() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms."); + completed(connection); } @Override @@ -107,63 +114,27 @@ public class AuthenticationHandshake implements MessageListener { } }); - // We could set already the authenticated flag here already, but as we need the reported peers we need - // to wait for the GetPeersAuthResponse before we are completed. + // now we add the reported peers to our list + peerGroup.addToReportedPeers(authenticationChallenge.reportedPeers, connection); } else { - log.warn("verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonce=" + nonce); - failed(new Exception("Verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonceMap=" + nonce)); + log.warn("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonce=" + nonce); + failed(new Exception("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonceMap=" + nonce)); } - } else if (message instanceof GetPeersAuthRequest) { + } else if (message instanceof AuthenticationResponse) { // Responding peer - GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; - log.trace("GetPeersAuthRequest from " + peerAddress + " at " + myAddress); - boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce; + AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; + log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress); + boolean verified = nonce != 0 && nonce == authenticationResponse.responderNonce; if (verified) { - // we create the msg with our already collected peer addresses (before adding the new ones) - GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress, - new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())); - SettableFuture future = networkNode.sendMessage(peerAddress, getPeersAuthResponse); - log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress); - - // now we add the reported peers to our own set - HashSet reportedPeers = getPeersAuthRequest.reportedPeers; - log.trace("Received reported peers: " + reportedPeers); - peerGroup.addToReportedPeers(reportedPeers, connection); - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Successfully sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress); - log.info("AuthenticationComplete: Peer with address " + peerAddress - + " authenticated (" + connection.getUid() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms."); - - completed(connection); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersAuthResponse sending failed " + throwable.getMessage()); - failed(throwable); - } - }); + peerGroup.addToReportedPeers(authenticationResponse.reportedPeers, connection); + log.info("AuthenticationComplete: Peer with address " + peerAddress + + " authenticated (" + connection.getUid() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms."); + completed(connection); } else { - log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce); - failed(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce)); + log.warn("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce); + failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce)); } - } else if (message instanceof GetPeersAuthResponse) { - // Requesting peer - GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message; - log.trace("GetPeersAuthResponse from " + peerAddress + " at " + myAddress); - HashSet reportedPeers = getPeersAuthResponse.reportedPeers; - log.trace("Received reported peers: " + reportedPeers); - peerGroup.addToReportedPeers(reportedPeers, connection); - - log.info("AuthenticationComplete: Peer with address " + peerAddress - + " authenticated (" + connection.getUid() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms."); - - completed(connection); } } } @@ -190,7 +161,6 @@ public class AuthenticationHandshake implements MessageListener { public void onSuccess(Connection connection) { log.trace("send AuthenticationRequest to " + peerAddress + " succeeded."); - connection.setPeerAddress(peerAddress); // We protect that connection from getting closed by maintenance cleanup... connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); } @@ -222,40 +192,42 @@ public class AuthenticationHandshake implements MessageListener { resultFutureOptional = Optional.of(SettableFuture.create()); - log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress); log.info("We shut down inbound connection from peer {} to establish a new " + - "connection with his reported address.", peerAddress); + "connection with his reported address to verify if his address is correct.", peerAddress); connection.shutDown(() -> { UserThread.runAfter(() -> { if (!stopped) { // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to - // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) - log.trace("processAuthenticationMessage: connection.shutDown complete. AuthenticationRequest from " + peerAddress + " at " + myAddress); + // inconsistent state + log.trace("respondToAuthenticationRequest: connection.shutDown complete. peerAddress=" + peerAddress + " / myAddress=" + myAddress); - AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress, + // we send additionally the reported and authenticated peers to save one message in the protocol. + AuthenticationChallenge authenticationChallenge = new AuthenticationChallenge(myAddress, authenticationRequest.requesterNonce, - getAndSetNonce()); - SettableFuture future = networkNode.sendMessage(peerAddress, authenticationResponse); + getAndSetNonce(), + new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())); + SettableFuture future = networkNode.sendMessage(peerAddress, authenticationChallenge); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("onSuccess sending AuthenticationResponse"); + log.trace("AuthenticationResponse successfully sent"); - connection.setPeerAddress(peerAddress); - // We use passive connectionType for connections created from received authentication requests from other peers - // That is used for protecting eclipse attacks + // We use passive connectionType for connections created from received authentication + // requests from other peers connection.setConnectionPriority(ConnectionPriority.PASSIVE); } @Override public void onFailure(@NotNull Throwable throwable) { - log.warn("onFailure sending AuthenticationResponse."); + log.warn("onFailure sending AuthenticationResponse. " + throwable.getMessage()); failed(throwable); } }); + } else { + log.warn("AuthenticationHandshake already shut down before we could sent AuthenticationResponse. That might happen in rare cases."); } - }, 200, TimeUnit.MILLISECONDS); + }, 1000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer }); return resultFutureOptional.get(); } @@ -284,7 +256,7 @@ public class AuthenticationHandshake implements MessageListener { } private void failed(@NotNull Throwable throwable) { - Log.traceCall(); + Log.traceCall(throwable.toString()); shutDown(); if (resultFutureOptional.isPresent()) resultFutureOptional.get().setException(throwable); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java index 7c65ff3ecb..1357e0761c 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java @@ -4,5 +4,5 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; public interface AuthenticationListener { - void onPeerAddressAuthenticated(Address peerAddress, Connection connection); + void onPeerAuthenticated(Address peerAddress, Connection connection); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java index f939db61dd..a41b1507e2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java @@ -56,7 +56,6 @@ public class MaintenanceManager implements MessageListener { public void onMessage(Message message, Connection connection) { if (message instanceof MaintenanceMessage) { Log.traceCall(message.toString()); - log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress()); if (message instanceof PingMessage) { SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); Futures.addCallback(future, new FutureCallback() { @@ -68,19 +67,19 @@ public class MaintenanceManager implements MessageListener { @Override public void onFailure(@NotNull Throwable throwable) { log.info("PongMessage sending failed " + throwable.getMessage()); - peerGroup.removePeer(connection.getPeerAddress()); + connection.getPeerAddress().ifPresent(peerAddress -> peerGroup.removePeer(peerAddress)); } }); } else if (message instanceof PongMessage) { - if (connection.getPeerAddress() != null) { - Peer peer = peerGroup.getAuthenticatedPeers().get(connection.getPeerAddress()); + connection.getPeerAddress().ifPresent(peerAddress -> { + Peer peer = peerGroup.getAuthenticatedPeers().get(peerAddress); if (peer != null) { if (((PongMessage) message).nonce != peer.getPingNonce()) { - log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + connection.getPeerAddress()); + log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + peerAddress); peerGroup.removePeer(peer.address); } } - } + }); } } } @@ -93,7 +92,7 @@ public class MaintenanceManager implements MessageListener { sendPingTimer = UserThread.runAfterRandomDelay(() -> { pingPeers(); startMaintenanceTimer(); - }, 5, 10, TimeUnit.MINUTES); + }, 5, 7, TimeUnit.MINUTES); } @@ -117,7 +116,7 @@ public class MaintenanceManager implements MessageListener { peerGroup.removePeer(e.address); } }); - }, 1, 10)); + }, 2, 4, TimeUnit.SECONDS)); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java index 310eb37665..6c33b3bbf3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java @@ -14,9 +14,9 @@ public class Peer { public final Address address; private final long pingNonce; - public Peer(Connection connection) { + public Peer(Connection connection, Address address) { this.connection = connection; - this.address = connection.getPeerAddress(); + this.address = address; pingNonce = new Random().nextLong(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index 3c80abc4b9..0163108f97 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -9,9 +9,9 @@ import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersRequest; -import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersResponse; -import io.bitsquare.p2p.peers.messages.peerexchange.PeerExchangeMessage; +import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest; +import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse; +import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +51,6 @@ public class PeerExchangeManager implements MessageListener { public void onMessage(Message message, Connection connection) { if (message instanceof PeerExchangeMessage) { Log.traceCall(message.toString()); - log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress()); if (message instanceof GetPeersRequest) { GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message; HashSet reportedPeers = getPeersRequestMessage.reportedPeers; @@ -68,7 +67,7 @@ public class PeerExchangeManager implements MessageListener { @Override public void onFailure(@NotNull Throwable throwable) { log.info("GetPeersResponse sending failed " + throwable.getMessage()); - peerGroup.removePeer(getPeersRequestMessage.address); + peerGroup.removePeer(getPeersRequestMessage.senderAddress); } }); @@ -90,7 +89,7 @@ public class PeerExchangeManager implements MessageListener { getPeersTimer = UserThread.runAfterRandomDelay(() -> { trySendGetPeersRequest(); startGetPeersTimer(); - }, 1, 2, TimeUnit.MINUTES); + }, 2, 4, TimeUnit.MINUTES); } private void trySendGetPeersRequest() { @@ -113,7 +112,7 @@ public class PeerExchangeManager implements MessageListener { peerGroup.removePeer(e.address); } }); - }, 5, 10)); + }, 3, 5, TimeUnit.SECONDS)); } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index b3a700544d..cda413b8d2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -47,7 +47,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { setMaxConnectionsLowPriority(8); } - static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000; + static final int INACTIVITY_PERIOD_BEFORE_PING = 5 * 60 * 1000; private static final int MAX_REPORTED_PEERS = 1000; private final NetworkNode networkNode; @@ -95,7 +95,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { @Override public void onDisconnect(Reason reason, Connection connection) { log.debug("onDisconnect connection=" + connection + " / reason=" + reason); - removePeer(connection.getPeerAddress()); + connection.getPeerAddress().ifPresent(peerAddress -> removePeer(peerAddress)); } @Override @@ -127,23 +127,30 @@ public class PeerGroup implements MessageListener, ConnectionListener { authenticatedPeers.values().stream() .filter(e -> !e.address.equals(sender)) .forEach(peer -> UserThread.runAfterRandomDelay(() -> { - final Address address = peer.address; - log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); - SettableFuture future = networkNode.sendMessage(address, message); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded."); - } + // as we use a delay we need to check again if our peer is still in the authenticated list + if (authenticatedPeers.containsValue(peer)) { + final Address address = peer.address; + log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); + SettableFuture future = networkNode.sendMessage(address, message); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded."); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Broadcast failed. " + throwable.getMessage()); - UserThread.execute(() -> removePeer(address)); - } - }); + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Broadcast failed. " + throwable.getMessage()); + UserThread.execute(() -> removePeer(address)); + } + }); + } else { + log.debug("Peer is not in our authenticated list anymore. " + + "That can happen as we use a delay in the loop for the broadcast. " + + "Peer.address={}", peer.address); + } }, - 10, 200, TimeUnit.MILLISECONDS)); + 10, 100, TimeUnit.MILLISECONDS)); } else { log.info("Message not broadcasted because we have no authenticated peers yet. " + "message = {}", message); @@ -167,7 +174,11 @@ public class PeerGroup implements MessageListener, ConnectionListener { private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) { Log.traceCall(message.toString()); - Address peerAddress = message.address; + Address peerAddress = message.senderAddress; + + // We set the address to the connection, otherwise we will not find the connection when sending + // a reject message and we would create a new outbound connection instead using the inbound. + connection.setPeerAddress(message.senderAddress); if (!authenticatedPeers.containsKey(peerAddress)) { AuthenticationHandshake authenticationHandshake; @@ -175,7 +186,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { log.info("We got an incoming AuthenticationRequest for the peerAddress ({})", peerAddress); // We protect that connection from getting closed by maintenance cleanup... connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); - authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), message.address); + authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress); authenticationHandshakes.put(peerAddress, authenticationHandshake); doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake); } else { @@ -183,13 +194,13 @@ public class PeerGroup implements MessageListener, ConnectionListener { "an authentication handshake for that peerAddress ({})", peerAddress); log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " + "smaller then the hashCode of the peers address ({}).", getMyAddress().hashCode(), - message.address.hashCode()); + message.senderAddress.hashCode()); authenticationHandshake = authenticationHandshakes.get(peerAddress); - if (getMyAddress().hashCode() < message.address.hashCode()) { + if (getMyAddress().hashCode() < peerAddress.hashCode()) { log.info("We reject the authentication request and keep our own request alive."); - rejectAuthenticationRequest(message, peerAddress); + rejectAuthenticationRequest(peerAddress); } else { log.info("We accept the authentication request but cancel our own request."); cancelOwnAuthenticationRequest(peerAddress, authenticationHandshake); @@ -201,35 +212,34 @@ public class PeerGroup implements MessageListener, ConnectionListener { log.warn("We got an incoming AuthenticationRequest but we are already authenticated to that peer " + "with peerAddress {}.\n" + "That might happen in some race conditions. We reject the request.", peerAddress); - rejectAuthenticationRequest(message, peerAddress); + rejectAuthenticationRequest(peerAddress); } } private void processAuthenticationRejection(AuthenticationRejection message) { Log.traceCall(message.toString()); - Address peerAddress = message.address; + Address peerAddress = message.senderAddress; cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress)); } - private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection, final Address peerAddress, AuthenticationHandshake authenticationHandshake) { + private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection, + Address peerAddress, AuthenticationHandshake authenticationHandshake) { Log.traceCall(message.toString()); SettableFuture future = authenticationHandshake.respondToAuthenticationRequest(message, connection); Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress does not match connection.getPeerAddress()"); - log.info("We got the peer who did an authentication request authenticated."); - addAuthenticatedPeer(connection, peerAddress); - } + @Override + public void onSuccess(Connection connection) { + log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress); + addAuthenticatedPeer(connection, peerAddress); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Authentication with peer who requested authentication failed.\n" + - "That can happen if the peer went offline. " + throwable.getMessage()); - removePeer(peerAddress); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Authentication with peer who requested authentication failed.\n" + + "That can happen if the peer went offline. " + throwable.getMessage()); + removePeer(peerAddress); + } } - ); } @@ -239,9 +249,9 @@ public class PeerGroup implements MessageListener, ConnectionListener { authenticationHandshakes.remove(peerAddress); } - private void rejectAuthenticationRequest(AuthenticationRequest message, Address peerAddress) { + private void rejectAuthenticationRequest(Address peerAddress) { Log.traceCall(); - networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress(), message.requesterNonce)); + networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress())); } @@ -254,42 +264,42 @@ public class PeerGroup implements MessageListener, ConnectionListener { seedNodeAddressesOptional = Optional.of(seedNodeAddresses); remainingSeedNodes.addAll(seedNodeAddresses); remainingSeedNodes.remove(peerAddress); - authenticateToFirstSeedNode(peerAddress); } private void authenticateToFirstSeedNode(Address peerAddress) { Log.traceCall(); if (!maxConnectionsForAuthReached()) { - if (remainingSeedNodesAvailable()) { - log.info("We try to authenticate to seed node {}.", peerAddress); - authenticate(peerAddress, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.info("We got our first seed node authenticated. " + - "We try if there are reported peers available to authenticate."); - addAuthenticatedPeer(connection, peerAddress); - authenticateToRemainingReportedPeer(); - } + log.info("We try to authenticate to seed node {}.", peerAddress); + authenticate(peerAddress, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.info("We got our first seed node authenticated. " + + "We try if there are reported peers available to authenticate."); - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Authentication to " + peerAddress + " failed." + - "\nThat is expected if seed nodes are offline." + - "\nException:" + throwable.getMessage()); + addAuthenticatedPeer(connection, peerAddress); + authenticateToRemainingReportedPeer(); + } - removePeer(peerAddress); + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Authentication to " + peerAddress + " failed." + + "\nThat is expected if seed nodes are offline." + + "\nException:" + throwable.getMessage()); + removePeer(peerAddress); + + if (remainingSeedNodesAvailable()) { log.info("We try another random seed node for first authentication attempt."); authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes)); + } else { + log.info("There are no seed nodes available for authentication. " + + "We try if there are reported peers available to authenticate."); + authenticateToRemainingReportedPeer(); } - }); - } else { - log.info("There are no seed nodes available for authentication. " + - "We try if there are reported peers available to authenticate."); - authenticateToRemainingReportedPeer(); - } + } + }); } else { log.info("We have already enough connections."); } @@ -330,7 +340,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { } else { log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 1, 2, TimeUnit.MINUTES); + 10, 20, TimeUnit.SECONDS); } } else { log.info("We have already enough connections."); @@ -346,38 +356,43 @@ public class PeerGroup implements MessageListener, ConnectionListener { Log.traceCall(); if (!maxConnectionsForAuthReached()) { if (reportedPeersAvailable()) { - Address peerAddress = getAndRemoveRandomReportedPeer(new ArrayList<>(reportedPeers)).address; - removeFromReportedPeers(peerAddress); + if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) { + Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address; + removeFromReportedPeers(peerAddress); + log.info("We try to authenticate to peer {}.", peerAddress); + authenticate(peerAddress, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.info("We got a peer authenticated. " + + "We try if there are more reported peers available to authenticate."); - log.info("We try to authenticate to peer {}.", peerAddress); - authenticate(peerAddress, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.info("We got a peer authenticated. " + - "We try if there are more reported peers available to authenticate."); + addAuthenticatedPeer(connection, peerAddress); + authenticateToRemainingReportedPeer(); + } - addAuthenticatedPeer(connection, peerAddress); - authenticateToRemainingReportedPeer(); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Authentication to " + peerAddress + " failed." + + "\nThat is expected if the peer is offline." + + "\nException:" + throwable.getMessage()); - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Authentication to " + peerAddress + " failed." + - "\nThat is expected if the peer is offline." + - "\nException:" + throwable.getMessage()); + removePeer(peerAddress); - removePeer(peerAddress); - - log.info("We try another random seed node for authentication."); - authenticateToRemainingReportedPeer(); - } - }); + log.info("We try another random seed node for authentication."); + authenticateToRemainingReportedPeer(); + } + }); + } else { + log.info("We don't have a reported peers available (maybe one is authenticating already). We will try again after a random pause."); + UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); + } } else if (remainingSeedNodesAvailable()) { authenticateToRemainingSeedNode(); } else { log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 1, 2, TimeUnit.MINUTES); + 30, 40, TimeUnit.SECONDS); } } else { log.info("We have already enough connections."); @@ -442,6 +457,10 @@ public class PeerGroup implements MessageListener, ConnectionListener { private void addAuthenticatedPeer(Connection connection, Address peerAddress) { Log.traceCall(peerAddress.getFullAddress()); + + connection.setPeerAddress(peerAddress); + connection.setAuthenticated(); + if (authenticationHandshakes.containsKey(peerAddress)) authenticationHandshakes.remove(peerAddress); @@ -452,15 +471,14 @@ public class PeerGroup implements MessageListener, ConnectionListener { + "\npeerAddress= " + peerAddress + "\n############################################################\n"); - authenticatedPeers.put(peerAddress, new Peer(connection)); + authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress)); removeFromReportedPeers(peerAddress); if (!checkIfConnectedPeersExceeds()) printAuthenticatedPeers(); - connection.setAuthenticated(peerAddress); //TODO check if address is set already - authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); + authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection)); } void removePeer(@Nullable Address peerAddress) { @@ -524,6 +542,7 @@ public class PeerGroup implements MessageListener, ConnectionListener { if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY) { authenticatedConnections = allConnections.stream() .filter(e -> e.isAuthenticated()) + .filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST) .collect(Collectors.toList()); } } @@ -536,10 +555,10 @@ public class PeerGroup implements MessageListener, ConnectionListener { Connection connection = authenticatedConnections.remove(0); log.info("We are going to shut down the oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS)); + connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 10, 50, TimeUnit.MILLISECONDS)); return true; } else { - log.warn("authenticatedConnections.size() == 0. That must never happen here. (checkIfConnectedPeersExceeds)"); + log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)"); return false; } } else { @@ -564,13 +583,17 @@ public class PeerGroup implements MessageListener, ConnectionListener { return all; } + public boolean isInAuthenticationProcess(Address address) { + return authenticationHandshakes.containsKey(address); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Reported peers /////////////////////////////////////////////////////////////////////////////////////////// void addToReportedPeers(HashSet reportedPeersToAdd, Connection connection) { - Log.traceCall(); + Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd); // we disconnect misbehaving nodes trying to send too many peers // reported peers include the authenticated peers which is normally max. 8 but we give some headroom // for safety @@ -640,6 +663,18 @@ public class PeerGroup implements MessageListener, ConnectionListener { return list.remove(new Random().nextInt(list.size())); } + private Optional getAndRemoveNotAuthenticatingReportedPeer() { + Optional reportedPeer = Optional.empty(); + List list = new ArrayList<>(reportedPeers); + if (!list.isEmpty()) { + do { + reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list)); + } + while (!list.isEmpty() && authenticationHandshakes.containsKey(reportedPeer.get().address)); + } + return reportedPeer; + } + private Address getAndRemoveRandomAddress(List
list) { checkArgument(!list.isEmpty(), "List must not be empty"); return list.remove(new Random().nextInt(list.size())); @@ -666,5 +701,4 @@ public class PeerGroup implements MessageListener, ConnectionListener { result.append("\n------------------------------------------------------------\n"); log.info(result.toString()); } - } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index 9f47bfe321..89f140f50d 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -8,12 +8,13 @@ import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.ConnectionPriority; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peers.messages.data.DataRequest; +import io.bitsquare.p2p.peers.messages.data.DataResponse; import io.bitsquare.p2p.storage.P2PDataStorage; import io.bitsquare.p2p.storage.data.ProtectedData; -import io.bitsquare.p2p.storage.messages.GetDataRequest; -import io.bitsquare.p2p.storage.messages.GetDataResponse; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -40,6 +41,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen private final NetworkNode networkNode; private final P2PDataStorage dataStorage; + private final PeerGroup peerGroup; private final Listener listener; private Optional
optionalConnectedSeedNodeAddress = Optional.empty(); @@ -50,9 +52,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) { + public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerGroup peerGroup, Listener listener) { this.networkNode = networkNode; this.dataStorage = dataStorage; + this.peerGroup = peerGroup; this.listener = listener; networkNode.addMessageListener(this); @@ -70,39 +73,46 @@ public class RequestDataManager implements MessageListener, AuthenticationListen if (!seedNodeAddresses.isEmpty()) { List
remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); Collections.shuffle(remainingSeedNodeAddresses); - Address candidate = remainingSeedNodeAddresses.remove(0); + Address candidate = remainingSeedNodeAddresses.get(0); + if (!peerGroup.isInAuthenticationProcess(candidate)) { + // We only remove it if it is not in the process of authentication + remainingSeedNodeAddresses.remove(0); + log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); - log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); + SettableFuture future = networkNode.sendMessage(candidate, new DataRequest()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("Send GetAllDataMessage to " + candidate + " succeeded."); + checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); + optionalConnectedSeedNodeAddress = Optional.of(candidate); + } - SettableFuture future = networkNode.sendMessage(candidate, new GetDataRequest()); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.info("Send GetAllDataMessage to " + candidate + " succeeded."); - checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); - optionalConnectedSeedNodeAddress = Optional.of(candidate); - } + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Send GetAllDataMessage to " + candidate + " failed. " + + "That is expected if the seed node is offline. " + + "Exception:" + throwable.getMessage()); + if (!remainingSeedNodeAddresses.isEmpty()) + log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses); - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Send GetAllDataMessage to " + candidate + " failed. " + - "That is expected if the seed node is offline. " + - "Exception:" + throwable.getMessage()); - if (!remainingSeedNodeAddresses.isEmpty()) - log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses); - - requestData(remainingSeedNodeAddresses); - } - }); + requestData(remainingSeedNodeAddresses); + } + }); + } else { + log.info("The seed node ({}) is in the process of authentication.\n" + + "We will try again after a pause of 3-5 sec.", candidate); + listener.onNoSeedNodeAvailable(); + UserThread.runAfterRandomDelay(() -> requestData(remainingSeedNodeAddresses), + 3, 5, TimeUnit.SECONDS); + } } else { log.info("There is no seed node available for requesting data. " + "That is expected if no seed node is online.\n" + - "We will try again after a pause of 20-30 sec."); + "We will try again after a pause of 10-20 sec."); listener.onNoSeedNodeAvailable(); - - // We re try after 20-30 sec. UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()), - 20, 30, TimeUnit.SECONDS); + 10, 20, TimeUnit.SECONDS); } } @@ -113,18 +123,18 @@ public class RequestDataManager implements MessageListener, AuthenticationListen @Override public void onMessage(Message message, Connection connection) { - if (message instanceof GetDataRequest) { + if (message instanceof DataRequest) { // We are a seed node and receive that msg from a new node Log.traceCall(message.toString()); - networkNode.sendMessage(connection, new GetDataResponse(new HashSet<>(dataStorage.getMap().values()))); - } else if (message instanceof GetDataResponse) { + networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values()))); + } else if (message instanceof DataResponse) { // We are the new node which has requested the data Log.traceCall(message.toString()); - GetDataResponse getDataResponse = (GetDataResponse) message; - HashSet set = getDataResponse.set; + DataResponse dataResponse = (DataResponse) message; + HashSet set = dataResponse.set; // we keep that connection open as the bootstrapping peer will use that for the authentication // as we are not authenticated yet the data adding will not be broadcasted - set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); + connection.getPeerAddress().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress))); optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress)); } } @@ -135,10 +145,13 @@ public class RequestDataManager implements MessageListener, AuthenticationListen /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + public void onPeerAuthenticated(Address peerAddress, Connection connection) { optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> { - if (connectedSeedNodeAddress.equals(peerAddress)) - requestDataFromAuthenticatedSeedNode(peerAddress, connection); + // We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE) + // We delay a bit to be sure that the authentication state is applied to all threads + if (connection.getConnectionPriority() == ConnectionPriority.ACTIVE && connectedSeedNodeAddress.equals(peerAddress)) + UserThread.runAfter(() + -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS); }); } @@ -147,7 +160,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) { Log.traceCall(peerAddress.toString()); // We have to request the data again as we might have missed pushed data in the meantime - SettableFuture future = networkNode.sendMessage(connection, new GetDataRequest()); + SettableFuture future = networkNode.sendMessage(connection, new DataRequest()); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java similarity index 59% rename from network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java index 3d40fc89a2..41693c1af8 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java @@ -6,24 +6,26 @@ import io.bitsquare.p2p.peers.ReportedPeer; import java.util.HashSet; -public final class GetPeersAuthRequest extends AuthenticationMessage { +public final class AuthenticationChallenge extends AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + public final long requesterNonce; public final long responderNonce; public final HashSet reportedPeers; - public GetPeersAuthRequest(Address address, long responderNonce, HashSet reportedPeers) { - super(address); + public AuthenticationChallenge(Address senderAddress, long requesterNonce, long responderNonce, HashSet reportedPeers) { + super(senderAddress); + this.requesterNonce = requesterNonce; this.responderNonce = responderNonce; this.reportedPeers = reportedPeers; } @Override public String toString() { - return "GetPeersAuthRequest{" + - "address=" + address + - ", challengerNonce=" + responderNonce + + return "AuthenticationChallenge{" + + ", requesterNonce=" + requesterNonce + + ", responderNonce=" + responderNonce + ", reportedPeers=" + reportedPeers + super.toString() + "} "; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java index bef49305e5..9a1e9eba9e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java @@ -7,10 +7,10 @@ import io.bitsquare.p2p.Message; public abstract class AuthenticationMessage implements Message { private final int networkId = Version.NETWORK_ID; - public final Address address; + public final Address senderAddress; - public AuthenticationMessage(Address address) { - this.address = address; + public AuthenticationMessage(Address senderAddress) { + this.senderAddress = senderAddress; } @Override @@ -20,7 +20,7 @@ public abstract class AuthenticationMessage implements Message { @Override public String toString() { - return ", address=" + address.toString() + + return ", address=" + senderAddress.toString() + ", networkId=" + networkId + '}'; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java index c810597164..97f2d53237 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java @@ -7,18 +7,14 @@ public final class AuthenticationRejection extends AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public final long requesterNonce; - - public AuthenticationRejection(Address address, long requesterNonce) { - super(address); - this.requesterNonce = requesterNonce; + public AuthenticationRejection(Address senderAddress) { + super(senderAddress); } @Override public String toString() { return "AuthenticationReject{" + - "address=" + address + - ", requesterNonce=" + requesterNonce + + "address=" + senderAddress + super.toString() + "} "; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java index e5744cd4dd..0e78d01135 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java @@ -9,16 +9,16 @@ public final class AuthenticationRequest extends AuthenticationMessage { public final long requesterNonce; - public AuthenticationRequest(Address address, long requesterNonce) { - super(address); + public AuthenticationRequest(Address senderAddress, long requesterNonce) { + super(senderAddress); this.requesterNonce = requesterNonce; } @Override public String toString() { return "AuthenticationRequest{" + - "address=" + address + - ", nonce=" + requesterNonce + + "senderAddress=" + senderAddress + + ", requesterNonce=" + requesterNonce + super.toString() + "} "; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java index 15bd2218d8..966beb6f49 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java @@ -2,26 +2,29 @@ package io.bitsquare.p2p.peers.messages.auth; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.peers.ReportedPeer; + +import java.util.HashSet; public final class AuthenticationResponse extends AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public final long requesterNonce; public final long responderNonce; + public final HashSet reportedPeers; - public AuthenticationResponse(Address address, long requesterNonce, long responderNonce) { - super(address); - this.requesterNonce = requesterNonce; + public AuthenticationResponse(Address senderAddress, long responderNonce, HashSet reportedPeers) { + super(senderAddress); this.responderNonce = responderNonce; + this.reportedPeers = reportedPeers; } @Override public String toString() { return "AuthenticationResponse{" + - "address=" + address + - ", requesterNonce=" + requesterNonce + - ", challengerNonce=" + responderNonce + + "address=" + senderAddress + + ", responderNonce=" + responderNonce + + ", reportedPeers=" + reportedPeers + super.toString() + "} "; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java deleted file mode 100644 index 58a7e4c7a0..0000000000 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.bitsquare.p2p.peers.messages.auth; - -import io.bitsquare.app.Version; -import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.peers.ReportedPeer; - -import java.util.HashSet; - -public final class GetPeersAuthResponse extends AuthenticationMessage { - // That object is sent over the wire, so we need to take care of version compatibility. - private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - - public final HashSet reportedPeers; - - public GetPeersAuthResponse(Address address, HashSet reportedPeers) { - super(address); - this.reportedPeers = reportedPeers; - } - - @Override - public String toString() { - return "GetPeersAuthResponse{" + - "address=" + address + - ", reportedPeers=" + reportedPeers + - super.toString() + "} "; - } -} diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java similarity index 80% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java index f70432c1f9..9dd5ef2386 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java @@ -1,15 +1,15 @@ -package io.bitsquare.p2p.storage.messages; +package io.bitsquare.p2p.peers.messages.data; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; -public final class GetDataRequest implements Message { +public final class DataRequest implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.NETWORK_ID; - - public GetDataRequest() { + + public DataRequest() { } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java similarity index 78% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java index 0380c5c92a..4e6b007d94 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.storage.messages; +package io.bitsquare.p2p.peers.messages.data; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; @@ -6,14 +6,14 @@ import io.bitsquare.p2p.storage.data.ProtectedData; import java.util.HashSet; -public final class GetDataResponse implements Message { +public final class DataResponse implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.NETWORK_ID; public final HashSet set; - public GetDataResponse(HashSet set) { + public DataResponse(HashSet set) { this.set = set; } @@ -25,9 +25,9 @@ public final class GetDataResponse implements Message { @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof GetDataResponse)) return false; + if (!(o instanceof DataResponse)) return false; - GetDataResponse that = (GetDataResponse) o; + DataResponse that = (DataResponse) o; return !(set != null ? !set.equals(that.set) : that.set != null); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java similarity index 71% rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java index 9d9dba3158..08d754da4d 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peers.messages.peerexchange; +package io.bitsquare.p2p.peers.messages.peers; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; @@ -10,18 +10,18 @@ public final class GetPeersRequest extends PeerExchangeMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public final Address address; + public final Address senderAddress; public final HashSet reportedPeers; - public GetPeersRequest(Address address, HashSet reportedPeers) { - this.address = address; + public GetPeersRequest(Address senderAddress, HashSet reportedPeers) { + this.senderAddress = senderAddress; this.reportedPeers = reportedPeers; } @Override public String toString() { return "GetPeersRequest{" + - "address=" + address + + "senderAddress=" + senderAddress + ", reportedPeers=" + reportedPeers + super.toString() + "} "; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java similarity index 92% rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java index fda16b5a73..bd5d3e5bca 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peers.messages.peerexchange; +package io.bitsquare.p2p.peers.messages.peers; import io.bitsquare.app.Version; import io.bitsquare.p2p.peers.ReportedPeer; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java similarity index 87% rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java index fba7f2bb35..d0e8de7449 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java @@ -1,4 +1,4 @@ -package io.bitsquare.p2p.peers.messages.peerexchange; +package io.bitsquare.p2p.peers.messages.peers; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 65e69dd4b1..099fff53cb 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -116,17 +116,18 @@ public class P2PDataStorage implements MessageListener { Log.traceCall(message.toString()); if (connection.isAuthenticated()) { log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection); - if (message instanceof AddDataMessage) { - add(((AddDataMessage) message).data, connection.getPeerAddress()); - } else if (message instanceof RemoveDataMessage) { - remove(((RemoveDataMessage) message).data, connection.getPeerAddress()); - } else if (message instanceof RemoveMailboxDataMessage) { - removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress()); - } + connection.getPeerAddress().ifPresent(peerAddress -> { + if (message instanceof AddDataMessage) { + add(((AddDataMessage) message).data, peerAddress); + } else if (message instanceof RemoveDataMessage) { + remove(((RemoveDataMessage) message).data, peerAddress); + } else if (message instanceof RemoveMailboxDataMessage) { + removeMailboxData(((RemoveMailboxDataMessage) message).data, peerAddress); + } + }); } else { log.warn("Connection is not authenticated yet. " + - "We don't accept storage operations from non-authenticated nodes."); - log.trace("Connection = " + connection); + "We don't accept storage operations from non-authenticated nodes. connection=", connection); connection.reportIllegalRequest(IllegalRequest.NotAuthenticated); } }