retry with failed seednodes after pause

This commit is contained in:
Manfred Karrer 2015-12-25 18:40:49 +01:00
parent 0c40d7154f
commit c7678df00c
2 changed files with 122 additions and 71 deletions

View file

@ -16,10 +16,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashSet; import java.util.*;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -29,7 +26,7 @@ import java.util.function.Supplier;
// seedNode: close connection // seedNode: close connection
// seedNode: send AuthenticationChallenge to client on a new connection to test if address is correct // seedNode: send AuthenticationChallenge to client on a new connection to test if address is correct
// client: authentication to seedNode done if nonce verification is ok // client: authentication to seedNode done if nonce verification is ok
// client: AuthenticationResponse to seedNode // client: AuthenticationFinalResponse to seedNode
// seedNode: authentication to client done if nonce verification is ok // seedNode: authentication to client done if nonce verification is ok
public class AuthenticationHandshake implements MessageListener { public class AuthenticationHandshake implements MessageListener {
@ -45,6 +42,7 @@ public class AuthenticationHandshake implements MessageListener {
private long nonce = 0; private long nonce = 0;
private boolean stopped; private boolean stopped;
private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty(); private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
private Timer timeoutTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -83,6 +81,10 @@ public class AuthenticationHandshake implements MessageListener {
// We are listening on all connections, so we need to filter out only our peer // We are listening on all connections, so we need to filter out only our peer
if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) { if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
if (timeoutTimer != null)
timeoutTimer.cancel();
if (message instanceof AuthenticationChallenge) { if (message instanceof AuthenticationChallenge) {
// Requesting peer // Requesting peer
AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message; AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
@ -91,18 +93,18 @@ public class AuthenticationHandshake implements MessageListener {
connection.setPeerAddress(authenticationChallenge.senderAddress); connection.setPeerAddress(authenticationChallenge.senderAddress);
// We use the active connectionType if we started the authentication request to another peer // We use the active connectionType if we started the authentication request to another peer
connection.setConnectionPriority(ConnectionPriority.ACTIVE); connection.setConnectionPriority(ConnectionPriority.ACTIVE);
log.trace("Received authenticationResponse from " + peerAddress); log.trace("Received authenticationChallenge from " + peerAddress);
boolean verified = nonce != 0 && nonce == authenticationChallenge.requesterNonce; boolean verified = nonce != 0 && nonce == authenticationChallenge.requesterNonce;
if (verified) { if (verified) {
AuthenticationFinalResponse authenticationFinalResponse = new AuthenticationFinalResponse(myAddress, AuthenticationFinalResponse authenticationFinalResponse = new AuthenticationFinalResponse(myAddress,
authenticationChallenge.responderNonce, authenticationChallenge.responderNonce,
new HashSet<>(authenticatedAndReportedPeersSupplier.get())); new HashSet<>(authenticatedAndReportedPeersSupplier.get()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationFinalResponse); SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationFinalResponse);
log.trace("Sent GetPeersAuthRequest {} to {}", authenticationFinalResponse, peerAddress); log.trace("Sent AuthenticationFinalResponse {} to {}", authenticationFinalResponse, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("Successfully sent GetPeersAuthRequest to {}", peerAddress); log.trace("Successfully sent AuthenticationFinalResponse to {}", peerAddress);
log.info("AuthenticationComplete: Peer with address " + peerAddress log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took " + " authenticated (" + connection.getUid() + "). Took "
@ -112,7 +114,7 @@ public class AuthenticationHandshake implements MessageListener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersAuthRequest sending failed " + throwable.getMessage()); log.info("AuthenticationFinalResponse sending failed " + throwable.getMessage());
failed(throwable); failed(throwable);
} }
}); });
@ -120,13 +122,13 @@ public class AuthenticationHandshake implements MessageListener {
// now we add the reported peers to our list // now we add the reported peers to our list
addReportedPeersConsumer.accept(authenticationChallenge.reportedPeers, connection); addReportedPeersConsumer.accept(authenticationChallenge.reportedPeers, connection);
} else { } else {
log.warn("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonce=" + nonce); log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationChallenge, nonce, peerAddress);
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce)); failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
} }
} else if (message instanceof AuthenticationFinalResponse) { } else if (message instanceof AuthenticationFinalResponse) {
// Responding peer // Responding peer
AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message; AuthenticationFinalResponse authenticationFinalResponse = (AuthenticationFinalResponse) message;
log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress); log.trace("Received AuthenticationFinalResponse from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == authenticationFinalResponse.responderNonce; boolean verified = nonce != 0 && nonce == authenticationFinalResponse.responderNonce;
if (verified) { if (verified) {
addReportedPeersConsumer.accept(authenticationFinalResponse.reportedPeers, connection); addReportedPeersConsumer.accept(authenticationFinalResponse.reportedPeers, connection);
@ -135,10 +137,11 @@ public class AuthenticationHandshake implements MessageListener {
+ (System.currentTimeMillis() - startAuthTs) + " ms."); + (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection); completed(connection);
} else { } else {
log.warn("Verification of nonce failed. authenticationResponse=" + authenticationFinalResponse + " / nonce=" + nonce); log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationFinalResponse, nonce, peerAddress);
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce)); failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
} }
} else if (message instanceof AuthenticationRejection) { } else if (message instanceof AuthenticationRejection) {
// Any peer
failed(new AuthenticationException("Authentication to peer " failed(new AuthenticationException("Authentication to peer "
+ ((AuthenticationRejection) message).senderAddress + ((AuthenticationRejection) message).senderAddress
+ " rejected because of a race conditions.")); + " rejected because of a race conditions."));
@ -147,10 +150,10 @@ public class AuthenticationHandshake implements MessageListener {
} }
} else { } else {
// TODO leave that for debugging for now, but remove it once the network is tested sufficiently // TODO leave that for debugging for now, but remove it once the network is tested sufficiently
log.warn("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. " + log.info("AuthenticationHandshake (peerAddress={}) already shut down but still got onMessage called. " +
"That can happen because of Thread mapping.", peerAddress); "That can happen because of Thread mapping.", peerAddress);
log.warn("message={}", message); log.debug("message={}", message);
log.warn("connection={}", connection); log.debug("connection={}", connection);
return; return;
} }
} }
@ -189,6 +192,13 @@ public class AuthenticationHandshake implements MessageListener {
} }
}); });
timeoutTimer = UserThread.runAfter(() -> {
failed(new AuthenticationException("Authentication to peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationChallenge message responded after 30 sec."));
}, 30, TimeUnit.SECONDS);
return resultFutureOptional.get(); return resultFutureOptional.get();
} }
@ -230,7 +240,7 @@ public class AuthenticationHandshake implements MessageListener {
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("AuthenticationResponse successfully sent"); log.trace("AuthenticationChallenge successfully sent");
// We use passive connectionType for connections created from received authentication // We use passive connectionType for connections created from received authentication
// requests from other peers // requests from other peers
@ -239,14 +249,24 @@ public class AuthenticationHandshake implements MessageListener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.warn("Failure at sending AuthenticationResponse. It might be that the peer went offline." + throwable.getMessage()); log.warn("Failure at sending AuthenticationChallenge to {}. It might be that the peer went offline. Exception={}", peerAddress, throwable.getMessage());
failed(throwable); failed(throwable);
} }
}); });
timeoutTimer = UserThread.runAfter(() -> {
failed(new AuthenticationException("Authentication of peer "
+ peerAddress
+ " failed because of a timeout. " +
"We did not get an AuthenticationFinalResponse message responded after 30 sec.\n" +
""));
}, 30, TimeUnit.SECONDS);
} else { } else {
log.info("AuthenticationHandshake (peerAddress={}) already shut down before we could sent AuthenticationResponse. That might happen in rare cases.", peerAddress); log.info("AuthenticationHandshake (peerAddress={}) already shut down before we could sent " +
"AuthenticationChallenge. That might happen in rare cases.", peerAddress);
} }
}, 1000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer }, 2000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer
}); });
return resultFutureOptional.get(); return resultFutureOptional.get();
} }
@ -308,5 +328,8 @@ public class AuthenticationHandshake implements MessageListener {
Log.traceCall("peerAddress = " + peerAddress); Log.traceCall("peerAddress = " + peerAddress);
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopped = true; stopped = true;
if (timeoutTimer != null)
timeoutTimer.cancel();
} }
} }

View file

@ -59,8 +59,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>(); private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>(); private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private final List<Address> remainingSeedNodes = new ArrayList<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>(); private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private final List<Address> remainingSeedNodes = new ArrayList<>();
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty(); private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
@ -286,9 +286,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
handleAuthenticationFailure(peerAddress, throwable); handleAuthenticationFailure(peerAddress, throwable);
if (remainingSeedNodesAvailable()) { Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) {
log.info("We try another random seed node for first authentication attempt."); log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes)); authenticateToFirstSeedNode(seedNodeOptional.get());
} else { } else {
log.info("There are no seed nodes available for authentication. " + log.info("There are no seed nodes available for authentication. " +
"We try if there are reported peers available to authenticate."); "We try if there are reported peers available to authenticate.");
@ -304,8 +305,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void authenticateToRemainingSeedNode() { private void authenticateToRemainingSeedNode() {
Log.traceCall(); Log.traceCall();
if (!maxConnectionsForAuthReached()) { if (!maxConnectionsForAuthReached()) {
if (remainingSeedNodesAvailable()) { Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
Address peerAddress = getAndRemoveRandomAddress(remainingSeedNodes); if (seedNodeOptional.isPresent()) {
Address peerAddress = seedNodeOptional.get();
log.info("We try to authenticate to seed node {}.", peerAddress); log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
@ -334,21 +336,30 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} else if (reportedPeersAvailable()) { } else if (reportedPeersAvailable()) {
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} else { } else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); log.info("We don't have seed nodes or reported peers available. " +
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), "We try again after a random pause with the seed nodes which failed or if " +
10, 20, TimeUnit.SECONDS); "none available with the reported peers.");
if (seedNodeAddressesOptional.isPresent()) {
remainingSeedNodes.clear();
seedNodeAddressesOptional.get().stream()
.filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
.forEach(e -> remainingSeedNodes.add(e));
if (!remainingSeedNodes.isEmpty())
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
30, 60, TimeUnit.SECONDS);
else
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} else {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
} }
} }
private Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to reported peers // Authentication to reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -357,45 +368,51 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(); Log.traceCall();
if (!maxConnectionsForAuthReached()) { if (!maxConnectionsForAuthReached()) {
if (reportedPeersAvailable()) { if (reportedPeersAvailable()) {
if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) { Optional<ReportedPeer> andRemoveNotAuthenticatingReportedPeer = getAndRemoveNotAuthenticatingReportedPeer();
Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address; if (andRemoveNotAuthenticatingReportedPeer.isPresent()) {
if (authenticationHandshakes.containsKey(peerAddress)) Address peerAddress = andRemoveNotAuthenticatingReportedPeer.get().address;
log.warn("getAndRemoveNotAuthenticatingReportedPeer delivered peer which is already in authenticationHandshakes");
removeFromReportedPeers(peerAddress); removeFromReportedPeers(peerAddress);
log.info("We try to authenticate to peer {}.", peerAddress); if (!authenticationHandshakes.containsKey(peerAddress)) {
if (authenticationHandshakes.containsKey(peerAddress)) log.info("We try to authenticate to peer {}.", peerAddress);
log.warn("peer already in authenticationHandshakes"); authenticate(peerAddress, new FutureCallback<Connection>() {
authenticate(peerAddress, new FutureCallback<Connection>() { @Override
@Override public void onSuccess(Connection connection) {
public void onSuccess(Connection connection) { log.info("We got a peer authenticated. " +
log.info("We got a peer authenticated. " + "We try if there are more reported peers available to authenticate.");
"We try if there are more reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} }
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingReportedPeer." + log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingReportedPeer." +
"\nThat is expected if the peer is offline." + "\nThat is expected if the peer is offline." +
"\nException:" + throwable.toString()); "\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable); handleAuthenticationFailure(peerAddress, throwable);
log.info("We try another random seed node for authentication."); log.info("We try another random seed node for authentication.");
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} }
}); });
} else {
log.warn("We got the selected peer in the authenticationHandshakes. That should not happen. " +
"We will try again after a short random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
1, 2, TimeUnit.SECONDS);
}
} else { } else {
log.info("We don't have a reported peers available (maybe one is authenticating already). We will try again after a random pause."); log.info("We don't have a reported peers available (maybe one is authenticating already). " +
"We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS); 10, 20, TimeUnit.SECONDS);
} }
} else if (remainingSeedNodesAvailable()) { } else if (!remainingSeedNodes.isEmpty()) {
authenticateToRemainingSeedNode(); authenticateToRemainingSeedNode();
} else { } else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); log.info("We don't have seed nodes or reported peers available. " +
"We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
30, 40, TimeUnit.SECONDS); 30, 40, TimeUnit.SECONDS);
} }
@ -550,10 +567,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY; return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
} }
private boolean remainingSeedNodesAvailable() {
return !remainingSeedNodes.isEmpty();
}
private boolean reportedPeersAvailable() { private boolean reportedPeersAvailable() {
return !reportedPeers.isEmpty(); return !reportedPeers.isEmpty();
} }
@ -711,17 +724,32 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} }
private Optional<ReportedPeer> getAndRemoveNotAuthenticatingReportedPeer() { private Optional<ReportedPeer> getAndRemoveNotAuthenticatingReportedPeer() {
Log.traceCall();
Optional<ReportedPeer> reportedPeer = Optional.empty(); Optional<ReportedPeer> reportedPeer = Optional.empty();
List<ReportedPeer> list = new ArrayList<>(reportedPeers); List<ReportedPeer> list = new ArrayList<>(reportedPeers);
if (!list.isEmpty()) { authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
do { if (!list.isEmpty())
reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list)); reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list));
}
while (!list.isEmpty() && authenticationHandshakes.containsKey(reportedPeer.get().address));
}
return reportedPeer; return reportedPeer;
} }
private Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
}
private Optional<Address> getAndRemoveNotAuthenticatingSeedNode() {
Log.traceCall();
Optional<Address> seedNode = Optional.empty();
List<Address> list = new ArrayList<>(remainingSeedNodes);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(e));
if (!list.isEmpty())
seedNode = Optional.of(getAndRemoveRandomAddress(list));
return seedNode;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Utils // Utils