mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-07-23 23:20:42 -04:00
Cancel/Reject auth process in case of race conditions
This commit is contained in:
parent
cf30a7ef01
commit
0c40d7154f
5 changed files with 126 additions and 122 deletions
|
@ -258,7 +258,7 @@ public class Connection implements MessageListener {
|
||||||
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
|
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
|
||||||
Log.traceCall("sendCloseConnectionMessage");
|
Log.traceCall("sendCloseConnectionMessage");
|
||||||
try {
|
try {
|
||||||
sendMessage(new CloseConnectionMessage(peerAddressOptional));
|
sendMessage(new CloseConnectionMessage(peerAddressOptional.isPresent() ? peerAddressOptional.get() : null));
|
||||||
setStopFlags();
|
setStopFlags();
|
||||||
|
|
||||||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||||
|
@ -411,7 +411,6 @@ public class Connection implements MessageListener {
|
||||||
|
|
||||||
public void handleConnectionException(Exception e) {
|
public void handleConnectionException(Exception e) {
|
||||||
Log.traceCall(e.toString());
|
Log.traceCall(e.toString());
|
||||||
log.debug("connection=" + this);
|
|
||||||
if (e instanceof SocketException) {
|
if (e instanceof SocketException) {
|
||||||
if (socket.isClosed())
|
if (socket.isClosed())
|
||||||
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
|
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
|
||||||
|
@ -419,13 +418,13 @@ public class Connection implements MessageListener {
|
||||||
shutDownReason = ConnectionListener.Reason.RESET;
|
shutDownReason = ConnectionListener.Reason.RESET;
|
||||||
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
|
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
|
||||||
shutDownReason = ConnectionListener.Reason.TIMEOUT;
|
shutDownReason = ConnectionListener.Reason.TIMEOUT;
|
||||||
log.warn("TimeoutException at connection with port " + socket.getLocalPort());
|
log.warn("TimeoutException at socket " + socket.toString());
|
||||||
log.debug("connection={}" + this);
|
log.debug("connection={}" + this);
|
||||||
} else if (e instanceof EOFException) {
|
} else if (e instanceof EOFException) {
|
||||||
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
|
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
|
||||||
} else {
|
} else {
|
||||||
shutDownReason = ConnectionListener.Reason.UNKNOWN;
|
shutDownReason = ConnectionListener.Reason.UNKNOWN;
|
||||||
log.warn("Exception at connection with port " + socket.getLocalPort());
|
log.warn("Exception at socket " + socket.toString());
|
||||||
log.debug("connection={}" + this);
|
log.debug("connection={}" + this);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -559,7 +558,7 @@ public class Connection implements MessageListener {
|
||||||
sharedSpace.updateLastActivityDate();
|
sharedSpace.updateLastActivityDate();
|
||||||
if (message instanceof CloseConnectionMessage) {
|
if (message instanceof CloseConnectionMessage) {
|
||||||
log.info("Close connection message received from peer {}",
|
log.info("Close connection message received from peer {}",
|
||||||
((CloseConnectionMessage) message).peerAddressOptional);
|
((CloseConnectionMessage) message).peerAddress);
|
||||||
stopped = true;
|
stopped = true;
|
||||||
sharedSpace.shutDown(false);
|
sharedSpace.shutDown(false);
|
||||||
} else if (!stopped) {
|
} else if (!stopped) {
|
||||||
|
|
|
@ -4,17 +4,17 @@ import io.bitsquare.app.Version;
|
||||||
import io.bitsquare.p2p.Address;
|
import io.bitsquare.p2p.Address;
|
||||||
import io.bitsquare.p2p.Message;
|
import io.bitsquare.p2p.Message;
|
||||||
|
|
||||||
import java.util.Optional;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
public final class CloseConnectionMessage implements Message {
|
public final class CloseConnectionMessage implements Message {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
private final int networkId = Version.NETWORK_ID;
|
private final int networkId = Version.NETWORK_ID;
|
||||||
public Optional<Address> peerAddressOptional;
|
public Address peerAddress;
|
||||||
|
|
||||||
public CloseConnectionMessage(Optional<Address> peerAddressOptional) {
|
public CloseConnectionMessage(@Nullable Address peerAddress) {
|
||||||
this.peerAddressOptional = peerAddressOptional;
|
this.peerAddress = peerAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -25,7 +25,7 @@ public final class CloseConnectionMessage implements Message {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "CloseConnectionMessage{" +
|
return "CloseConnectionMessage{" +
|
||||||
"peerAddressOptional=" + peerAddressOptional +
|
"peerAddress=" + peerAddress +
|
||||||
", networkId=" + networkId +
|
", networkId=" + networkId +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class AuthenticationException extends Exception {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(AuthenticationException.class);
|
||||||
|
|
||||||
|
public AuthenticationException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,10 +11,7 @@ import io.bitsquare.p2p.network.Connection;
|
||||||
import io.bitsquare.p2p.network.ConnectionPriority;
|
import io.bitsquare.p2p.network.ConnectionPriority;
|
||||||
import io.bitsquare.p2p.network.MessageListener;
|
import io.bitsquare.p2p.network.MessageListener;
|
||||||
import io.bitsquare.p2p.network.NetworkNode;
|
import io.bitsquare.p2p.network.NetworkNode;
|
||||||
import io.bitsquare.p2p.peers.messages.auth.AuthenticationChallenge;
|
import io.bitsquare.p2p.peers.messages.auth.*;
|
||||||
import io.bitsquare.p2p.peers.messages.auth.AuthenticationFinalResponse;
|
|
||||||
import io.bitsquare.p2p.peers.messages.auth.AuthenticationMessage;
|
|
||||||
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -48,7 +45,6 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
private long nonce = 0;
|
private long nonce = 0;
|
||||||
private boolean stopped;
|
private boolean stopped;
|
||||||
private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
|
private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
|
||||||
private boolean ownRequestCanceled;
|
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -78,24 +74,17 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, Connection connection) {
|
public void onMessage(Message message, Connection connection) {
|
||||||
if (stopped) {
|
// called from other thread but mapped to user thread. That can cause async behaviour.
|
||||||
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. That must not happen.", peerAddress);
|
// Example: We got the AuthenticationHandshake shut down and the message listener
|
||||||
log.warn("message={}", message);
|
// has been already removed but we still get the onMessage called as the Platform.runLater get called at the next
|
||||||
log.warn("connection={}", connection);
|
// cycle. So we need to protect a late call with the stopped flag.
|
||||||
return;
|
if (!stopped) {
|
||||||
}
|
|
||||||
|
|
||||||
if (message instanceof AuthenticationMessage) {
|
if (message instanceof AuthenticationMessage) {
|
||||||
// We are listening on all connections, so we need to filter out only our peer
|
// We are listening on all connections, so we need to filter out only our peer
|
||||||
if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
|
if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
|
||||||
Log.traceCall(message.toString());
|
Log.traceCall(message.toString());
|
||||||
if (message instanceof AuthenticationChallenge) {
|
if (message instanceof AuthenticationChallenge) {
|
||||||
// Requesting peer
|
// Requesting peer
|
||||||
if (ownRequestCanceled) {
|
|
||||||
log.info("Our own request has been canceled because of a race condition. " +
|
|
||||||
"\nWe ignore that message and go on with the protocol from the other peers request. " +
|
|
||||||
"\nThat might happen in rare cases.");
|
|
||||||
} else {
|
|
||||||
AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
|
AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
|
||||||
// We need to set the address to the connection, otherwise we will not find the connection when sending
|
// We need to set the address to the connection, otherwise we will not find the connection when sending
|
||||||
// the next message and we would create a new outbound connection instead using the inbound.
|
// the next message and we would create a new outbound connection instead using the inbound.
|
||||||
|
@ -134,7 +123,6 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
log.warn("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonce=" + nonce);
|
log.warn("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonce=" + nonce);
|
||||||
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
|
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else if (message instanceof AuthenticationFinalResponse) {
|
} else if (message instanceof AuthenticationFinalResponse) {
|
||||||
// Responding peer
|
// Responding peer
|
||||||
AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message;
|
AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message;
|
||||||
|
@ -150,9 +138,21 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
log.warn("Verification of nonce failed. authenticationResponse=" + authenticationFinalResponse + " / nonce=" + nonce);
|
log.warn("Verification of nonce failed. authenticationResponse=" + authenticationFinalResponse + " / nonce=" + nonce);
|
||||||
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
|
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
|
||||||
}
|
}
|
||||||
|
} else if (message instanceof AuthenticationRejection) {
|
||||||
|
failed(new AuthenticationException("Authentication to peer "
|
||||||
|
+ ((AuthenticationRejection) message).senderAddress
|
||||||
|
+ " rejected because of a race conditions."));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// TODO leave that for debugging for now, but remove it once the network is tested sufficiently
|
||||||
|
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. " +
|
||||||
|
"That can happen because of Thread mapping.", peerAddress);
|
||||||
|
log.warn("message={}", message);
|
||||||
|
log.warn("connection={}", connection);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,6 +165,7 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
// Requesting peer
|
// Requesting peer
|
||||||
|
|
||||||
if (stopped) {
|
if (stopped) {
|
||||||
|
// TODO leave that for debugging for now, but remove it once the network is tested sufficiently
|
||||||
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got requestAuthentication called. That must not happen.", peerAddress);
|
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got requestAuthentication called. That must not happen.", peerAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,6 +203,7 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
// Responding peer
|
// Responding peer
|
||||||
|
|
||||||
if (stopped) {
|
if (stopped) {
|
||||||
|
// TODO leave that for debugging for now, but remove it once the network is tested sufficiently
|
||||||
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got respondToAuthenticationRequest called. That must not happen.", peerAddress);
|
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got respondToAuthenticationRequest called. That must not happen.", peerAddress);
|
||||||
log.warn("authenticationRequest={}", authenticationRequest);
|
log.warn("authenticationRequest={}", authenticationRequest);
|
||||||
log.warn("connection={}", connection);
|
log.warn("connection={}", connection);
|
||||||
|
@ -251,14 +253,14 @@ public class AuthenticationHandshake implements MessageListener {
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// Cancel if we send reject message
|
// Cancel
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public void setOwnRequestCanceled() {
|
public void cancel(Address peerAddress) {
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
nonce = 0;
|
failed(new AuthenticationException("Authentication to peer "
|
||||||
stopped = false;
|
+ peerAddress
|
||||||
ownRequestCanceled = true;
|
+ " canceled because of a race conditions."));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -100,8 +100,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
|
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
|
||||||
|
|
||||||
connection.getPeerAddress().ifPresent(peerAddress -> {
|
connection.getPeerAddress().ifPresent(peerAddress -> {
|
||||||
// We only remove it if we are nto in the authentication process
|
// We only remove the peer from the authenticationHandshakes and the reportedPeers
|
||||||
// Connection shut down is a step in the authentication process.
|
// if we are not in the authentication process
|
||||||
|
// Connection shut down is an expected step in the authentication process.
|
||||||
if (!authenticationHandshakes.containsKey(peerAddress))
|
if (!authenticationHandshakes.containsKey(peerAddress))
|
||||||
removePeer(peerAddress);
|
removePeer(peerAddress);
|
||||||
});
|
});
|
||||||
|
@ -120,8 +121,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
public void onMessage(Message message, Connection connection) {
|
public void onMessage(Message message, Connection connection) {
|
||||||
if (message instanceof AuthenticationRequest)
|
if (message instanceof AuthenticationRequest)
|
||||||
processAuthenticationRequest((AuthenticationRequest) message, connection);
|
processAuthenticationRequest((AuthenticationRequest) message, connection);
|
||||||
else if (message instanceof AuthenticationRejection)
|
|
||||||
processAuthenticationRejection((AuthenticationRejection) message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -208,43 +207,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
(newReportedPeers, connection1) -> addToReportedPeers(newReportedPeers, connection1)
|
(newReportedPeers, connection1) -> addToReportedPeers(newReportedPeers, connection1)
|
||||||
);
|
);
|
||||||
authenticationHandshakes.put(peerAddress, authenticationHandshake);
|
authenticationHandshakes.put(peerAddress, authenticationHandshake);
|
||||||
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
|
|
||||||
} else {
|
|
||||||
log.info("We got an incoming AuthenticationRequest but we have started ourselves already " +
|
|
||||||
"an authentication handshake for that peerAddress ({})", peerAddress);
|
|
||||||
log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " +
|
|
||||||
"smaller then the hashCode of the peers address ({}). Result = {}", getMyAddress().hashCode(),
|
|
||||||
message.senderAddress.hashCode(), (getMyAddress().hashCode() < peerAddress.hashCode()));
|
|
||||||
|
|
||||||
authenticationHandshake = authenticationHandshakes.get(peerAddress);
|
|
||||||
|
|
||||||
if (getMyAddress().hashCode() < peerAddress.hashCode()) {
|
|
||||||
log.info("We reject the authentication request and keep our own request alive.");
|
|
||||||
rejectAuthenticationRequest(peerAddress);
|
|
||||||
} else {
|
|
||||||
log.info("We accept the authentication request but cancel our own request.");
|
|
||||||
cancelOwnAuthenticationRequest(peerAddress);
|
|
||||||
|
|
||||||
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.info("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
|
|
||||||
"with peerAddress {}.\n" +
|
|
||||||
"That might happen in some race conditions. We reject the request.", peerAddress);
|
|
||||||
rejectAuthenticationRequest(peerAddress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processAuthenticationRejection(AuthenticationRejection message) {
|
|
||||||
Log.traceCall(message.toString());
|
|
||||||
Address peerAddress = message.senderAddress;
|
|
||||||
cancelOwnAuthenticationRequest(peerAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection,
|
|
||||||
Address peerAddress, AuthenticationHandshake authenticationHandshake) {
|
|
||||||
Log.traceCall(message.toString());
|
|
||||||
SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
|
SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -257,16 +219,29 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
log.info("Authentication with peer who requested authentication failed.\n" +
|
log.info("Authentication with peer who requested authentication failed.\n" +
|
||||||
"That can happen if the peer went offline. " + throwable.getMessage());
|
"That can happen if the peer went offline. " + throwable.getMessage());
|
||||||
removePeer(peerAddress);
|
handleAuthenticationFailure(peerAddress, throwable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
} else {
|
||||||
|
log.info("We got an incoming AuthenticationRequest but we have started ourselves already " +
|
||||||
|
"an authentication handshake for that peerAddress ({}).\n" +
|
||||||
|
"We terminate such race conditions by rejecting and cancelling the authentication on both " +
|
||||||
|
"peers.", peerAddress);
|
||||||
|
|
||||||
|
rejectAuthenticationRequest(peerAddress);
|
||||||
|
authenticationHandshakes.get(peerAddress).cancel(peerAddress);
|
||||||
|
authenticationHandshakes.remove(peerAddress);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("We got an incoming AuthenticationRequest but we are already authenticated to peer {}.\n" +
|
||||||
|
"That should not happen. We reject the request.", peerAddress);
|
||||||
|
rejectAuthenticationRequest(peerAddress);
|
||||||
|
|
||||||
private void cancelOwnAuthenticationRequest(Address peerAddress) {
|
|
||||||
Log.traceCall();
|
|
||||||
if (authenticationHandshakes.containsKey(peerAddress)) {
|
if (authenticationHandshakes.containsKey(peerAddress)) {
|
||||||
authenticationHandshakes.get(peerAddress).setOwnRequestCanceled();
|
authenticationHandshakes.get(peerAddress).cancel(peerAddress);
|
||||||
|
authenticationHandshakes.remove(peerAddress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,7 +284,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
"\nThat is expected if seed nodes are offline." +
|
"\nThat is expected if seed nodes are offline." +
|
||||||
"\nException:" + throwable.toString());
|
"\nException:" + throwable.toString());
|
||||||
|
|
||||||
removePeer(peerAddress);
|
handleAuthenticationFailure(peerAddress, throwable);
|
||||||
|
|
||||||
if (remainingSeedNodesAvailable()) {
|
if (remainingSeedNodesAvailable()) {
|
||||||
log.info("We try another random seed node for first authentication attempt.");
|
log.info("We try another random seed node for first authentication attempt.");
|
||||||
|
@ -348,7 +323,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
"\nThat is expected if the seed node is offline." +
|
"\nThat is expected if the seed node is offline." +
|
||||||
"\nException:" + throwable.toString());
|
"\nException:" + throwable.toString());
|
||||||
|
|
||||||
removePeer(peerAddress);
|
handleAuthenticationFailure(peerAddress, throwable);
|
||||||
|
|
||||||
log.info("We try another random seed node for authentication.");
|
log.info("We try another random seed node for authentication.");
|
||||||
authenticateToRemainingSeedNode();
|
authenticateToRemainingSeedNode();
|
||||||
|
@ -384,8 +359,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
if (reportedPeersAvailable()) {
|
if (reportedPeersAvailable()) {
|
||||||
if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) {
|
if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) {
|
||||||
Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address;
|
Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address;
|
||||||
|
if (authenticationHandshakes.containsKey(peerAddress))
|
||||||
|
log.warn("getAndRemoveNotAuthenticatingReportedPeer delivered peer which is already in authenticationHandshakes");
|
||||||
removeFromReportedPeers(peerAddress);
|
removeFromReportedPeers(peerAddress);
|
||||||
log.info("We try to authenticate to peer {}.", peerAddress);
|
log.info("We try to authenticate to peer {}.", peerAddress);
|
||||||
|
if (authenticationHandshakes.containsKey(peerAddress))
|
||||||
|
log.warn("peer already in authenticationHandshakes");
|
||||||
authenticate(peerAddress, new FutureCallback<Connection>() {
|
authenticate(peerAddress, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Connection connection) {
|
public void onSuccess(Connection connection) {
|
||||||
|
@ -402,7 +381,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
"\nThat is expected if the peer is offline." +
|
"\nThat is expected if the peer is offline." +
|
||||||
"\nException:" + throwable.toString());
|
"\nException:" + throwable.toString());
|
||||||
|
|
||||||
removePeer(peerAddress);
|
handleAuthenticationFailure(peerAddress, throwable);
|
||||||
|
|
||||||
log.info("We try another random seed node for authentication.");
|
log.info("We try another random seed node for authentication.");
|
||||||
authenticateToRemainingReportedPeer();
|
authenticateToRemainingReportedPeer();
|
||||||
|
@ -480,7 +459,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
log.error("Authentication to " + peerAddress + " for sending a private message failed at authenticateToDirectMessagePeer." +
|
log.error("Authentication to " + peerAddress + " for sending a private message failed at authenticateToDirectMessagePeer." +
|
||||||
"\nSeems that the peer is offline." +
|
"\nSeems that the peer is offline." +
|
||||||
"\nException:" + throwable.toString());
|
"\nException:" + throwable.toString());
|
||||||
removePeer(peerAddress);
|
handleAuthenticationFailure(peerAddress, throwable);
|
||||||
if (faultHandler != null)
|
if (faultHandler != null)
|
||||||
faultHandler.run();
|
faultHandler.run();
|
||||||
}
|
}
|
||||||
|
@ -517,8 +496,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
connection.setPeerAddress(peerAddress);
|
connection.setPeerAddress(peerAddress);
|
||||||
connection.setAuthenticated();
|
connection.setAuthenticated();
|
||||||
|
|
||||||
if (authenticationHandshakes.containsKey(peerAddress))
|
removeFromAuthenticationHandshakes(peerAddress);
|
||||||
authenticationHandshakes.remove(peerAddress);
|
|
||||||
|
|
||||||
log.info("\n\n############################################################\n" +
|
log.info("\n\n############################################################\n" +
|
||||||
"We are authenticated to:" +
|
"We are authenticated to:" +
|
||||||
|
@ -537,17 +515,19 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
|
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void handleAuthenticationFailure(@Nullable Address peerAddress, Throwable throwable) {
|
||||||
|
if (throwable instanceof AuthenticationException)
|
||||||
|
removeFromAuthenticationHandshakes(peerAddress);
|
||||||
|
else
|
||||||
|
removePeer(peerAddress);
|
||||||
|
}
|
||||||
|
|
||||||
void removePeer(@Nullable Address peerAddress) {
|
void removePeer(@Nullable Address peerAddress) {
|
||||||
Log.traceCall("peerAddress=" + peerAddress);
|
Log.traceCall("peerAddress=" + peerAddress);
|
||||||
if (peerAddress != null) {
|
if (peerAddress != null) {
|
||||||
if (authenticationHandshakes.containsKey(peerAddress))
|
removeFromAuthenticationHandshakes(peerAddress);
|
||||||
authenticationHandshakes.remove(peerAddress);
|
|
||||||
|
|
||||||
removeFromReportedPeers(peerAddress);
|
removeFromReportedPeers(peerAddress);
|
||||||
|
removeFromAuthenticatedPeers(peerAddress);
|
||||||
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
|
|
||||||
if (disconnectedPeer != null)
|
|
||||||
printAuthenticatedPeers();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,6 +535,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
||||||
reportedPeers.remove(new ReportedPeer(peerAddress));
|
reportedPeers.remove(new ReportedPeer(peerAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeFromAuthenticationHandshakes(@Nullable Address peerAddress) {
|
||||||
|
if (authenticationHandshakes.containsKey(peerAddress))
|
||||||
|
authenticationHandshakes.remove(peerAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeFromAuthenticatedPeers(@Nullable Address peerAddress) {
|
||||||
|
if (authenticatedPeers.containsKey(peerAddress))
|
||||||
|
authenticatedPeers.remove(peerAddress);
|
||||||
|
printAuthenticatedPeers();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean maxConnectionsForAuthReached() {
|
private boolean maxConnectionsForAuthReached() {
|
||||||
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
|
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue