diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
index eb089e9efc..4750ccce91 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
@@ -258,7 +258,7 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
Log.traceCall("sendCloseConnectionMessage");
try {
- sendMessage(new CloseConnectionMessage(peerAddressOptional));
+ sendMessage(new CloseConnectionMessage(peerAddressOptional.isPresent() ? peerAddressOptional.get() : null));
setStopFlags();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -411,7 +411,6 @@ public class Connection implements MessageListener {
public void handleConnectionException(Exception e) {
Log.traceCall(e.toString());
- log.debug("connection=" + this);
if (e instanceof SocketException) {
if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@@ -419,13 +418,13 @@ public class Connection implements MessageListener {
shutDownReason = ConnectionListener.Reason.RESET;
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
shutDownReason = ConnectionListener.Reason.TIMEOUT;
- log.warn("TimeoutException at connection with port " + socket.getLocalPort());
+ log.warn("TimeoutException at socket " + socket.toString());
log.debug("connection={}" + this);
} else if (e instanceof EOFException) {
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
} else {
shutDownReason = ConnectionListener.Reason.UNKNOWN;
- log.warn("Exception at connection with port " + socket.getLocalPort());
+ log.warn("Exception at socket " + socket.toString());
log.debug("connection={}" + this);
e.printStackTrace();
}
@@ -559,7 +558,7 @@ public class Connection implements MessageListener {
sharedSpace.updateLastActivityDate();
if (message instanceof CloseConnectionMessage) {
log.info("Close connection message received from peer {}",
- ((CloseConnectionMessage) message).peerAddressOptional);
+ ((CloseConnectionMessage) message).peerAddress);
stopped = true;
sharedSpace.shutDown(false);
} else if (!stopped) {
diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
index e040e6b341..ccbf87264e 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
@@ -4,17 +4,17 @@ import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
-import java.util.Optional;
+import javax.annotation.Nullable;
public final class CloseConnectionMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID;
- public Optional
peerAddressOptional;
+ public Address peerAddress;
- public CloseConnectionMessage(Optional peerAddressOptional) {
- this.peerAddressOptional = peerAddressOptional;
+ public CloseConnectionMessage(@Nullable Address peerAddress) {
+ this.peerAddress = peerAddress;
}
@Override
@@ -25,7 +25,7 @@ public final class CloseConnectionMessage implements Message {
@Override
public String toString() {
return "CloseConnectionMessage{" +
- "peerAddressOptional=" + peerAddressOptional +
+ "peerAddress=" + peerAddress +
", networkId=" + networkId +
'}';
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java
new file mode 100644
index 0000000000..16ef724788
--- /dev/null
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java
@@ -0,0 +1,12 @@
+package io.bitsquare.p2p.peers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthenticationException extends Exception {
+ private static final Logger log = LoggerFactory.getLogger(AuthenticationException.class);
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
index c4f6d2edad..1f35f37ce4 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
@@ -11,10 +11,7 @@ import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionPriority;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
-import io.bitsquare.p2p.peers.messages.auth.AuthenticationChallenge;
-import io.bitsquare.p2p.peers.messages.auth.AuthenticationFinalResponse;
-import io.bitsquare.p2p.peers.messages.auth.AuthenticationMessage;
-import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
+import io.bitsquare.p2p.peers.messages.auth.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +45,6 @@ public class AuthenticationHandshake implements MessageListener {
private long nonce = 0;
private boolean stopped;
private Optional> resultFutureOptional = Optional.empty();
- private boolean ownRequestCanceled;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -78,24 +74,17 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
- if (stopped) {
- log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. That must not happen.", peerAddress);
- log.warn("message={}", message);
- log.warn("connection={}", connection);
- return;
- }
-
- if (message instanceof AuthenticationMessage) {
- // We are listening on all connections, so we need to filter out only our peer
- if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
- Log.traceCall(message.toString());
- if (message instanceof AuthenticationChallenge) {
- // Requesting peer
- if (ownRequestCanceled) {
- log.info("Our own request has been canceled because of a race condition. " +
- "\nWe ignore that message and go on with the protocol from the other peers request. " +
- "\nThat might happen in rare cases.");
- } else {
+ // called from other thread but mapped to user thread. That can cause async behaviour.
+ // Example: We got the AuthenticationHandshake shut down and the message listener
+ // has been already removed but we still get the onMessage called as the Platform.runLater get called at the next
+ // cycle. So we need to protect a late call with the stopped flag.
+ if (!stopped) {
+ if (message instanceof AuthenticationMessage) {
+ // We are listening on all connections, so we need to filter out only our peer
+ if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
+ Log.traceCall(message.toString());
+ if (message instanceof AuthenticationChallenge) {
+ // Requesting peer
AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
// We need to set the address to the connection, otherwise we will not find the connection when sending
// the next message and we would create a new outbound connection instead using the inbound.
@@ -134,24 +123,35 @@ public class AuthenticationHandshake implements MessageListener {
log.warn("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonce=" + nonce);
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
}
- }
- } else if (message instanceof AuthenticationFinalResponse) {
- // Responding peer
- AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message;
- log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
- boolean verified = nonce != 0 && nonce == authenticationFinalResponse.responderNonce;
- if (verified) {
- addReportedPeersConsumer.accept(authenticationFinalResponse.reportedPeers, connection);
- log.info("AuthenticationComplete: Peer with address " + peerAddress
- + " authenticated (" + connection.getUid() + "). Took "
- + (System.currentTimeMillis() - startAuthTs) + " ms.");
- completed(connection);
- } else {
- log.warn("Verification of nonce failed. authenticationResponse=" + authenticationFinalResponse + " / nonce=" + nonce);
- failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
+ } else if (message instanceof AuthenticationFinalResponse) {
+ // Responding peer
+ AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message;
+ log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
+ boolean verified = nonce != 0 && nonce == authenticationFinalResponse.responderNonce;
+ if (verified) {
+ addReportedPeersConsumer.accept(authenticationFinalResponse.reportedPeers, connection);
+ log.info("AuthenticationComplete: Peer with address " + peerAddress
+ + " authenticated (" + connection.getUid() + "). Took "
+ + (System.currentTimeMillis() - startAuthTs) + " ms.");
+ completed(connection);
+ } else {
+ log.warn("Verification of nonce failed. authenticationResponse=" + authenticationFinalResponse + " / nonce=" + nonce);
+ failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
+ }
+ } else if (message instanceof AuthenticationRejection) {
+ failed(new AuthenticationException("Authentication to peer "
+ + ((AuthenticationRejection) message).senderAddress
+ + " rejected because of a race conditions."));
}
}
}
+ } else {
+ // TODO leave that for debugging for now, but remove it once the network is tested sufficiently
+ log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. " +
+ "That can happen because of Thread mapping.", peerAddress);
+ log.warn("message={}", message);
+ log.warn("connection={}", connection);
+ return;
}
}
@@ -165,6 +165,7 @@ public class AuthenticationHandshake implements MessageListener {
// Requesting peer
if (stopped) {
+ // TODO leave that for debugging for now, but remove it once the network is tested sufficiently
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got requestAuthentication called. That must not happen.", peerAddress);
}
@@ -202,6 +203,7 @@ public class AuthenticationHandshake implements MessageListener {
// Responding peer
if (stopped) {
+ // TODO leave that for debugging for now, but remove it once the network is tested sufficiently
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got respondToAuthenticationRequest called. That must not happen.", peerAddress);
log.warn("authenticationRequest={}", authenticationRequest);
log.warn("connection={}", connection);
@@ -251,14 +253,14 @@ public class AuthenticationHandshake implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
- // Cancel if we send reject message
+ // Cancel
///////////////////////////////////////////////////////////////////////////////////////////
- public void setOwnRequestCanceled() {
+ public void cancel(Address peerAddress) {
Log.traceCall();
- nonce = 0;
- stopped = false;
- ownRequestCanceled = true;
+ failed(new AuthenticationException("Authentication to peer "
+ + peerAddress
+ + " canceled because of a race conditions."));
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
index 22e4315cd1..9541f71eeb 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
@@ -100,8 +100,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
connection.getPeerAddress().ifPresent(peerAddress -> {
- // We only remove it if we are nto in the authentication process
- // Connection shut down is a step in the authentication process.
+ // We only remove the peer from the authenticationHandshakes and the reportedPeers
+ // if we are not in the authentication process
+ // Connection shut down is an expected step in the authentication process.
if (!authenticationHandshakes.containsKey(peerAddress))
removePeer(peerAddress);
});
@@ -120,8 +121,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof AuthenticationRequest)
processAuthenticationRequest((AuthenticationRequest) message, connection);
- else if (message instanceof AuthenticationRejection)
- processAuthenticationRejection((AuthenticationRejection) message);
}
@@ -208,65 +207,41 @@ public class PeerGroup implements MessageListener, ConnectionListener {
(newReportedPeers, connection1) -> addToReportedPeers(newReportedPeers, connection1)
);
authenticationHandshakes.put(peerAddress, authenticationHandshake);
- doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
+ SettableFuture future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress);
+ addAuthenticatedPeer(connection, peerAddress);
+ }
+
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Authentication with peer who requested authentication failed.\n" +
+ "That can happen if the peer went offline. " + throwable.getMessage());
+ handleAuthenticationFailure(peerAddress, throwable);
+ }
+ }
+ );
} else {
log.info("We got an incoming AuthenticationRequest but we have started ourselves already " +
- "an authentication handshake for that peerAddress ({})", peerAddress);
- log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " +
- "smaller then the hashCode of the peers address ({}). Result = {}", getMyAddress().hashCode(),
- message.senderAddress.hashCode(), (getMyAddress().hashCode() < peerAddress.hashCode()));
+ "an authentication handshake for that peerAddress ({}).\n" +
+ "We terminate such race conditions by rejecting and cancelling the authentication on both " +
+ "peers.", peerAddress);
- authenticationHandshake = authenticationHandshakes.get(peerAddress);
-
- if (getMyAddress().hashCode() < peerAddress.hashCode()) {
- log.info("We reject the authentication request and keep our own request alive.");
- rejectAuthenticationRequest(peerAddress);
- } else {
- log.info("We accept the authentication request but cancel our own request.");
- cancelOwnAuthenticationRequest(peerAddress);
-
- doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
- }
+ rejectAuthenticationRequest(peerAddress);
+ authenticationHandshakes.get(peerAddress).cancel(peerAddress);
+ authenticationHandshakes.remove(peerAddress);
}
} else {
- log.info("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
- "with peerAddress {}.\n" +
- "That might happen in some race conditions. We reject the request.", peerAddress);
+ log.info("We got an incoming AuthenticationRequest but we are already authenticated to peer {}.\n" +
+ "That should not happen. We reject the request.", peerAddress);
rejectAuthenticationRequest(peerAddress);
- }
- }
- private void processAuthenticationRejection(AuthenticationRejection message) {
- Log.traceCall(message.toString());
- Address peerAddress = message.senderAddress;
- cancelOwnAuthenticationRequest(peerAddress);
- }
-
- private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection,
- Address peerAddress, AuthenticationHandshake authenticationHandshake) {
- Log.traceCall(message.toString());
- SettableFuture future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress);
- addAuthenticatedPeer(connection, peerAddress);
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Authentication with peer who requested authentication failed.\n" +
- "That can happen if the peer went offline. " + throwable.getMessage());
- removePeer(peerAddress);
- }
- }
- );
- }
-
- private void cancelOwnAuthenticationRequest(Address peerAddress) {
- Log.traceCall();
- if (authenticationHandshakes.containsKey(peerAddress)) {
- authenticationHandshakes.get(peerAddress).setOwnRequestCanceled();
+ if (authenticationHandshakes.containsKey(peerAddress)) {
+ authenticationHandshakes.get(peerAddress).cancel(peerAddress);
+ authenticationHandshakes.remove(peerAddress);
+ }
}
}
@@ -309,7 +284,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.toString());
- removePeer(peerAddress);
+ handleAuthenticationFailure(peerAddress, throwable);
if (remainingSeedNodesAvailable()) {
log.info("We try another random seed node for first authentication attempt.");
@@ -348,7 +323,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"\nThat is expected if the seed node is offline." +
"\nException:" + throwable.toString());
- removePeer(peerAddress);
+ handleAuthenticationFailure(peerAddress, throwable);
log.info("We try another random seed node for authentication.");
authenticateToRemainingSeedNode();
@@ -384,8 +359,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (reportedPeersAvailable()) {
if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) {
Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address;
+ if (authenticationHandshakes.containsKey(peerAddress))
+ log.warn("getAndRemoveNotAuthenticatingReportedPeer delivered peer which is already in authenticationHandshakes");
removeFromReportedPeers(peerAddress);
log.info("We try to authenticate to peer {}.", peerAddress);
+ if (authenticationHandshakes.containsKey(peerAddress))
+ log.warn("peer already in authenticationHandshakes");
authenticate(peerAddress, new FutureCallback() {
@Override
public void onSuccess(Connection connection) {
@@ -402,7 +381,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"\nThat is expected if the peer is offline." +
"\nException:" + throwable.toString());
- removePeer(peerAddress);
+ handleAuthenticationFailure(peerAddress, throwable);
log.info("We try another random seed node for authentication.");
authenticateToRemainingReportedPeer();
@@ -480,7 +459,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.error("Authentication to " + peerAddress + " for sending a private message failed at authenticateToDirectMessagePeer." +
"\nSeems that the peer is offline." +
"\nException:" + throwable.toString());
- removePeer(peerAddress);
+ handleAuthenticationFailure(peerAddress, throwable);
if (faultHandler != null)
faultHandler.run();
}
@@ -517,8 +496,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
connection.setPeerAddress(peerAddress);
connection.setAuthenticated();
- if (authenticationHandshakes.containsKey(peerAddress))
- authenticationHandshakes.remove(peerAddress);
+ removeFromAuthenticationHandshakes(peerAddress);
log.info("\n\n############################################################\n" +
"We are authenticated to:" +
@@ -537,17 +515,19 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
}
+ void handleAuthenticationFailure(@Nullable Address peerAddress, Throwable throwable) {
+ if (throwable instanceof AuthenticationException)
+ removeFromAuthenticationHandshakes(peerAddress);
+ else
+ removePeer(peerAddress);
+ }
+
void removePeer(@Nullable Address peerAddress) {
Log.traceCall("peerAddress=" + peerAddress);
if (peerAddress != null) {
- if (authenticationHandshakes.containsKey(peerAddress))
- authenticationHandshakes.remove(peerAddress);
-
+ removeFromAuthenticationHandshakes(peerAddress);
removeFromReportedPeers(peerAddress);
-
- Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
- if (disconnectedPeer != null)
- printAuthenticatedPeers();
+ removeFromAuthenticatedPeers(peerAddress);
}
}
@@ -555,6 +535,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
reportedPeers.remove(new ReportedPeer(peerAddress));
}
+ private void removeFromAuthenticationHandshakes(@Nullable Address peerAddress) {
+ if (authenticationHandshakes.containsKey(peerAddress))
+ authenticationHandshakes.remove(peerAddress);
+ }
+
+ private void removeFromAuthenticatedPeers(@Nullable Address peerAddress) {
+ if (authenticatedPeers.containsKey(peerAddress))
+ authenticatedPeers.remove(peerAddress);
+ printAuthenticatedPeers();
+ }
+
private boolean maxConnectionsForAuthReached() {
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
}