Stable verison

This commit is contained in:
Manfred Karrer 2015-12-26 21:23:39 +01:00
parent c7678df00c
commit 4e2cfa7804
12 changed files with 257 additions and 161 deletions

View File

@ -1,22 +1,9 @@
package io.bitsquare.p2p;
public class AuthenticationException extends RuntimeException {
public AuthenticationException() {
}
public class AuthenticationException extends Exception {
public AuthenticationException(String message) {
super(message);
}
public AuthenticationException(String message, Throwable cause) {
super(message, cause);
}
public AuthenticationException(Throwable cause) {
super(cause);
}
public AuthenticationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -176,7 +176,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// we remove ourselves from the list of seed nodes
seedNodeAddresses.remove(mySeedNodeAddress);
peerGroup.setIsSeedNode(true);
requestDataManager.setIsSeedNode(true);
start(listener);
}
@ -287,7 +289,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
connection.getPeerAddress().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
connection.getPeerAddressOptional().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@ -336,7 +338,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
connection.getPeerAddress().ifPresent(peerAddresses ->
connection.getPeerAddressOptional().ifPresent(peerAddresses ->
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, peerAddresses)));
} else {
@ -374,14 +376,18 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)");
checkAuthentication();
if (!authenticatedPeerAddresses.contains(peerAddress))
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
() -> sendMailMessageListener.onFault());
else
doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener);
try {
checkAuthentication();
if (!authenticatedPeerAddresses.contains(peerAddress))
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
() -> sendMailMessageListener.onFault());
else
doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener);
} catch (AuthenticationException e) {
log.error(e.getMessage());
throw new RuntimeException(e);
}
}
private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
@ -458,17 +464,22 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
checkAuthentication();
if (authenticatedPeerAddresses.contains(peerAddress)) {
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
} else {
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
() -> {
log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
});
try {
checkAuthentication();
if (authenticatedPeerAddresses.contains(peerAddress)) {
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
} else {
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
() -> {
log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
});
}
} catch (AuthenticationException e) {
log.error(e.getMessage());
//TODO check if boolean return type can avoid throwing an exception
throw new RuntimeException(e);
}
}
@ -513,13 +524,17 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
checkAuthentication();
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
dataStorage.add(protectedMailboxData, networkNode.getAddress());
} catch (AuthenticationException e) {
log.error(e.getMessage());
//TODO check if boolean return type can avoid throwing an exception
throw new RuntimeException(e);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
@ -528,30 +543,36 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
PublicKey receiversPubKey = mailboxData.receiversPubKey;
checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPubKey);
dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
try {
checkAuthentication();
if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
PublicKey receiversPubKey = mailboxData.receiversPubKey;
checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPubKey);
dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully decryptedMsgWithPubKey.");
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully decryptedMsgWithPubKey.");
}
} else {
log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
"\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
}
} else {
log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
"\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
} catch (AuthenticationException e) {
log.error(e.getMessage());
//TODO check if boolean return type can avoid throwing an exception
throw new RuntimeException(e);
}
}
@ -573,13 +594,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private boolean doAddData(ExpirablePayload expirablePayload, boolean rePublish) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
checkAuthentication();
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
if (rePublish)
return dataStorage.rePublish(protectedData, networkNode.getAddress());
else
return dataStorage.add(protectedData, networkNode.getAddress());
} catch (AuthenticationException e) {
log.error(e.getMessage());
return false;
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -589,10 +613,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public boolean removeData(ExpirablePayload expirablePayload) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkAuthentication();
try {
checkAuthentication();
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
return dataStorage.remove(protectedData, networkNode.getAddress());
} catch (AuthenticationException e) {
log.error(e.getMessage());
return false;
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -681,7 +708,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private void checkAuthentication() {
private void checkAuthentication() throws AuthenticationException {
Log.traceCall();
if (authenticatedPeerAddresses.isEmpty())
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");

View File

@ -193,11 +193,11 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
@Nullable
public synchronized Address getPeerAddress1() {
public synchronized Address getPeerAddress() {
return peerAddressOptional.isPresent() ? peerAddressOptional.get() : null;
}
public synchronized Optional<Address> getPeerAddress() {
public synchronized Optional<Address> getPeerAddressOptional() {
return peerAddressOptional;
}
@ -258,7 +258,7 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
Log.traceCall("sendCloseConnectionMessage");
try {
sendMessage(new CloseConnectionMessage(peerAddressOptional.isPresent() ? peerAddressOptional.get() : null));
sendMessage(new CloseConnectionMessage());
setStopFlags();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@ -557,8 +557,7 @@ public class Connection implements MessageListener {
sharedSpace.updateLastActivityDate();
if (message instanceof CloseConnectionMessage) {
log.info("Close connection message received from peer {}",
((CloseConnectionMessage) message).peerAddress);
log.info("CloseConnectionMessage received on connection {}", sharedSpace.connection);
stopped = true;
sharedSpace.shutDown(false);
} else if (!stopped) {

View File

@ -216,7 +216,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public void addSetupListener(SetupListener setupListener) {
Log.traceCall();
setupListeners.add(setupListener);
boolean isNewEntry = setupListeners.add(setupListener);
if (!isNewEntry)
log.warn("Try to add a setupListener which was already added.");
}
@ -262,21 +264,24 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public void addConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
boolean newEntry = connectionListeners.add(connectionListener);
if (!newEntry)
boolean isNewEntry = connectionListeners.add(connectionListener);
if (!isNewEntry)
log.warn("Try to add a connectionListener which was already added.\nconnectionListener={}\nconnectionListeners={}"
, connectionListener, connectionListeners);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.remove(connectionListener);
boolean contained = connectionListeners.remove(connectionListener);
if (!contained)
log.debug("Try to remove a connectionListener which was never added. " +
"That might happen because of async behaviour of CopyOnWriteArraySet");
}
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
boolean newEntry = messageListeners.add(messageListener);
if (!newEntry)
boolean isNewEntry = messageListeners.add(messageListener);
if (!isNewEntry)
log.warn("Try to add a messageListener which was already added.");
}
@ -284,7 +289,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Log.traceCall();
boolean contained = messageListeners.remove(messageListener);
if (!contained)
log.warn("Try to remove a messageListener which was never added.");
log.debug("Try to remove a messageListener which was never added. " +
"That might happen because of async behaviour of CopyOnWriteArraySet");
}
@ -330,13 +336,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
// Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
.filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
.filter(e -> e.getPeerAddressOptional().isPresent() && peerAddress.equals(e.getPeerAddressOptional().get())).findAny();
}
private Optional<Connection> lookupInboundConnection(Address peerAddress) {
// Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
.filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
.filter(e -> e.getPeerAddressOptional().isPresent() && peerAddress.equals(e.getPeerAddressOptional().get())).findAny();
}
abstract protected Socket createSocket(Address peerAddress) throws IOException;

View File

@ -4,8 +4,6 @@ import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import javax.annotation.Nullable;
public final class CloseConnectionMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
@ -13,8 +11,7 @@ public final class CloseConnectionMessage implements Message {
private final int networkId = Version.NETWORK_ID;
public Address peerAddress;
public CloseConnectionMessage(@Nullable Address peerAddress) {
this.peerAddress = peerAddress;
public CloseConnectionMessage() {
}
@Override
@ -25,7 +22,6 @@ public final class CloseConnectionMessage implements Message {
@Override
public String toString() {
return "CloseConnectionMessage{" +
"peerAddress=" + peerAddress +
", networkId=" + networkId +
'}';
}

View File

@ -1,10 +1,6 @@
package io.bitsquare.p2p.peers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AuthenticationException extends Exception {
private static final Logger log = LoggerFactory.getLogger(AuthenticationException.class);
public AuthenticationException(String message) {
super(message);

View File

@ -54,9 +54,9 @@ public class AuthenticationHandshake implements MessageListener {
Address peerAddress,
Supplier<Set<ReportedPeer>> authenticatedAndReportedPeersSupplier,
BiConsumer<HashSet<ReportedPeer>, Connection> addReportedPeersConsumer) {
Log.traceCall("peerAddress " + peerAddress);
this.authenticatedAndReportedPeersSupplier = authenticatedAndReportedPeersSupplier;
this.addReportedPeersConsumer = addReportedPeersConsumer;
Log.traceCall("peerAddress " + peerAddress);
this.networkNode = networkNode;
this.myAddress = myAddress;
this.peerAddress = peerAddress;
@ -122,7 +122,7 @@ public class AuthenticationHandshake implements MessageListener {
// now we add the reported peers to our list
addReportedPeersConsumer.accept(authenticationChallenge.reportedPeers, connection);
} else {
log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationChallenge, nonce, peerAddress);
log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationChallenge={}", nonce, peerAddress, authenticationChallenge);
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
}
} else if (message instanceof AuthenticationFinalResponse) {
@ -137,7 +137,7 @@ public class AuthenticationHandshake implements MessageListener {
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
} else {
log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationFinalResponse, nonce, peerAddress);
log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", nonce, peerAddress, authenticationFinalResponse);
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
}
} else if (message instanceof AuthenticationRejection) {
@ -326,10 +326,11 @@ public class AuthenticationHandshake implements MessageListener {
private void shutDown() {
Log.traceCall("peerAddress = " + peerAddress);
networkNode.removeMessageListener(this);
stopped = true;
if (timeoutTimer != null)
timeoutTimer.cancel();
networkNode.removeMessageListener(this);
}
}

View File

@ -51,7 +51,7 @@ public class MaintenanceManager implements MessageListener {
public void shutDown() {
Log.traceCall();
if (sendPingTimer != null)
if (sendPingTimer != null)
sendPingTimer.cancel();
networkNode.removeMessageListener(this);
@ -77,11 +77,11 @@ public class MaintenanceManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
connection.getPeerAddress().ifPresent(peerAddress -> removePeerConsumer.accept(peerAddress));
connection.getPeerAddressOptional().ifPresent(peerAddress -> removePeerConsumer.accept(peerAddress));
}
});
} else if (message instanceof PongMessage) {
connection.getPeerAddress().ifPresent(peerAddress -> {
connection.getPeerAddressOptional().ifPresent(peerAddress -> {
Peer peer = authenticatedPeersSupplier.get().get(peerAddress);
if (peer != null) {
if (((PongMessage) message).nonce != peer.pingNonce) {

View File

@ -42,7 +42,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
static {
setMaxConnectionsLowPriority(18);
setMaxConnectionsLowPriority(10);
}
private static final int MAX_REPORTED_PEERS = 1000;
@ -62,6 +62,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private final List<Address> remainingSeedNodes = new ArrayList<>();
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
private Timer connectToSeedNodeTimer;
private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@ -84,6 +86,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
() -> getAuthenticatedPeers(),
address -> removePeer(address),
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection));
startConnectToSeedNodeTimer();
}
public void setIsSeedNode(boolean isSeedNode) {
this.isSeedNode = isSeedNode;
}
@ -99,12 +107,19 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
connection.getPeerAddress().ifPresent(peerAddress -> {
connection.getPeerAddressOptional().ifPresent(peerAddress -> {
// We only remove the peer from the authenticationHandshakes and the reportedPeers
// if we are not in the authentication process
// Connection shut down is an expected step in the authentication process.
if (!authenticationHandshakes.containsKey(peerAddress))
// If the disconnect happens on an authenticated peer we remove the peer.
if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress)) {
removePeer(peerAddress);
log.info("We got a disconnect. " +
"We will try again after a random pause to remaining reported peers.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
});
}
@ -172,6 +187,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
if (connectToSeedNodeTimer != null)
connectToSeedNodeTimer.cancel();
}
public void addAuthenticationListener(AuthenticationListener listener) {
@ -258,45 +276,53 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void authenticateToSeedNode(Address peerAddress, Set<Address> seedNodeAddresses) {
Log.traceCall();
seedNodeAddressesOptional = Optional.of(seedNodeAddresses);
remainingSeedNodes.addAll(seedNodeAddresses);
remainingSeedNodes.remove(peerAddress);
remainingSeedNodes.addAll(seedNodeAddresses);
authenticateToFirstSeedNode(peerAddress);
}
private void authenticateToFirstSeedNode(Address peerAddress) {
Log.traceCall();
if (!maxConnectionsForAuthReached()) {
if (!enoughConnectionsForAuthReached()) {
log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable);
Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) {
log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(seedNodeOptional.get());
} else {
log.info("There are no seed nodes available for authentication. " +
if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer();
}
}
});
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable);
Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) {
log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(seedNodeOptional.get());
} else {
log.info("There are no seed nodes available for authentication. " +
"We try if there are reported peers available to authenticate.");
authenticateToRemainingReportedPeer();
}
}
});
} else {
log.warn("We got the first seed node already in the authenticationHandshakes. " +
"That might happen when we received an AuthenticationRequest before we start authenticating. " +
"We will try after a random pause to authenticate to the reported peers.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
20, 30, TimeUnit.SECONDS);
}
} else {
log.info("We have already enough connections.");
}
@ -304,7 +330,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void authenticateToRemainingSeedNode() {
Log.traceCall();
if (!maxConnectionsForAuthReached()) {
if (!enoughConnectionsForAuthReached()) {
Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) {
Address peerAddress = seedNodeOptional.get();
@ -333,26 +359,27 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
);
} else if (reportedPeersAvailable()) {
} else if (reportedPeersAvailable() && !isSeedNode) {
authenticateToRemainingReportedPeer();
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We try again after a random pause with the seed nodes which failed or if " +
"none available with the reported peers.");
if (seedNodeAddressesOptional.isPresent()) {
remainingSeedNodes.clear();
seedNodeAddressesOptional.get().stream()
.filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
.forEach(e -> remainingSeedNodes.add(e));
if (!remainingSeedNodes.isEmpty())
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
30, 60, TimeUnit.SECONDS);
else
resetRemainingSeedNodes();
if (remainingSeedNodes.isEmpty() && !isSeedNode) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} else {
} else {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
30, 60, TimeUnit.SECONDS);
}
} else if (!isSeedNode) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} else {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
30, 60, TimeUnit.SECONDS);
}
}
} else {
@ -360,13 +387,44 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private void resetRemainingSeedNodes() {
if (seedNodeAddressesOptional.isPresent()) {
remainingSeedNodes.clear();
seedNodeAddressesOptional.get().stream()
.filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
.forEach(e -> remainingSeedNodes.add(e));
}
}
// We want to stay connected to at least one seed node from time to time to avoid to get isolated with a group of peers
private void startConnectToSeedNodeTimer() {
Log.traceCall();
if (connectToSeedNodeTimer != null)
connectToSeedNodeTimer.cancel();
connectToSeedNodeTimer = UserThread.runAfterRandomDelay(() -> {
connectToSeedNode();
startConnectToSeedNodeTimer();
}, 1, 2, TimeUnit.MINUTES);
}
private void connectToSeedNode() {
// remove enough connections first
checkIfConnectedPeersExceeds(MAX_CONNECTIONS_NORMAL_PRIORITY - 3);
UserThread.runAfter(() -> {
resetRemainingSeedNodes();
authenticateToRemainingSeedNode();
}, 500, TimeUnit.MILLISECONDS);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to reported peers
///////////////////////////////////////////////////////////////////////////////////////////
private void authenticateToRemainingReportedPeer() {
Log.traceCall();
if (!maxConnectionsForAuthReached()) {
if (!enoughConnectionsForAuthReached()) {
if (reportedPeersAvailable()) {
Optional<ReportedPeer> andRemoveNotAuthenticatingReportedPeer = getAndRemoveNotAuthenticatingReportedPeer();
if (andRemoveNotAuthenticatingReportedPeer.isPresent()) {
@ -410,6 +468,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
} else if (!remainingSeedNodes.isEmpty()) {
authenticateToRemainingSeedNode();
} else if (remainingSeedNodes.isEmpty()) {
UserThread.runAfterRandomDelay(() -> {
resetRemainingSeedNodes();
authenticateToRemainingSeedNode();
},
10, 20, TimeUnit.SECONDS);
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We will try again after a random pause.");
@ -526,7 +591,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
removeFromReportedPeers(peerAddress);
if (!checkIfConnectedPeersExceeds())
if (!checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY))
printAuthenticatedPeers();
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
@ -563,30 +628,35 @@ public class PeerGroup implements MessageListener, ConnectionListener {
printAuthenticatedPeers();
}
private boolean maxConnectionsForAuthReached() {
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
private boolean enoughConnectionsForAuthReached() {
// We reduce the limit to avoid dangling connect/disconnect
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY - 2;
}
private boolean reportedPeersAvailable() {
return !reportedPeers.isEmpty();
}
private boolean checkIfConnectedPeersExceeds() {
private boolean checkIfConnectedPeersExceeds(int limit) {
Log.traceCall();
int size = authenticatedPeers.size();
if (size > PeerGroup.MAX_CONNECTIONS_LOW_PRIORITY) {
if (size > limit) {
Set<Connection> allConnections = networkNode.getAllConnections();
int allConnectionsSize = allConnections.size();
log.info("We have {} connections open. Lets remove the passive connections" +
" which have not been active recently.", allConnectionsSize);
if (size != allConnectionsSize) {
log.info("We have {} connections open (authenticatedPeers={}). Lets remove the passive connections" +
" which have not been active recently.", allConnectionsSize, size);
// TODO Investigate inconsistency which between size and allConnectionsSize sometimes.
/* if (size != allConnectionsSize) {
log.warn("authenticatedPeers.size()!=allConnections.size(). There is some inconsistency.");
log.debug("authenticatedPeers={}", authenticatedPeers);
log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections());
}
}*/
// If we are a seed node we don't remove other seed nodes to keep the core network well connected
List<Connection> authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@ -596,6 +666,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@ -605,6 +676,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
.filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
}
}
@ -617,7 +689,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Connection connection = authenticatedConnections.remove(0);
log.info("We are going to shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 10, 50, TimeUnit.MILLISECONDS));
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(limit), 10, 50, TimeUnit.MILLISECONDS));
return true;
} else {
log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)");
@ -629,6 +701,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private boolean isAuthConnectionSeedNode(Connection e) {
return e.getPeerAddressOptional().isPresent() && seedNodeAddressesOptional.isPresent() && seedNodeAddressesOptional.get().contains(e.getPeerAddressOptional().get());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
@ -728,6 +804,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Optional<ReportedPeer> reportedPeer = Optional.empty();
List<ReportedPeer> list = new ArrayList<>(reportedPeers);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
if (!list.isEmpty())
reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list));
@ -743,10 +820,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private Optional<Address> getAndRemoveNotAuthenticatingSeedNode() {
Log.traceCall();
Optional<Address> seedNode = Optional.empty();
List<Address> list = new ArrayList<>(remainingSeedNodes);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(e));
if (!list.isEmpty())
seedNode = Optional.of(getAndRemoveRandomAddress(list));
authenticationHandshakes.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
authenticatedPeers.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
if (!remainingSeedNodes.isEmpty())
seedNode = Optional.of(getAndRemoveRandomAddress(remainingSeedNodes));
return seedNode;
}

View File

@ -28,6 +28,7 @@ import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener, AuthenticationListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -46,6 +47,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty();
private Optional<Collection<Address>> optionalSeedNodeAddresses = Optional.empty();
private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@ -72,6 +74,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void setIsSeedNode(boolean isSeedNode) {
this.isSeedNode = isSeedNode;
}
public void requestData(Collection<Address> seedNodeAddresses) {
if (!optionalSeedNodeAddresses.isPresent())
optionalSeedNodeAddresses = Optional.of(seedNodeAddresses);
@ -141,7 +147,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
HashSet<ProtectedData> set = dataResponse.set;
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
connection.getPeerAddress().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
connection.getPeerAddressOptional().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress));
}
}
@ -153,16 +159,23 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
@Override
public void onPeerAuthenticated(Address peerAddress, Connection connection) {
if (isSeedNode && dataStorage.getMap().isEmpty()) {
// We are the seed node and entering the network we request the data from the peer
UserThread.runAfterRandomDelay(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS);
}
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
// We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE)
// We delay a bit to be sure that the authentication state is applied to all threads
if (connection.getConnectionPriority() == ConnectionPriority.ACTIVE && connectedSeedNodeAddress.equals(peerAddress))
if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) {
// We are the node (can be a seed node as well) which requested the authentication
UserThread.runAfter(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS);
}
});
}
// 5. Step after authentication to first seed node we request again the data
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString());

View File

@ -14,7 +14,6 @@ public final class AuthenticationRejection extends AuthenticationMessage {
@Override
public String toString() {
return "AuthenticationReject{" +
"address=" + senderAddress +
super.toString() + "} ";
}
}

View File

@ -44,7 +44,6 @@ public class P2PDataStorage implements MessageListener {
private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage;
private final Timer timer = new Timer();
private volatile boolean shutDownInProgress;
///////////////////////////////////////////////////////////////////////////////////////////
@ -116,7 +115,7 @@ public class P2PDataStorage implements MessageListener {
Log.traceCall(message.toString());
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
connection.getPeerAddress().ifPresent(peerAddress -> {
connection.getPeerAddressOptional().ifPresent(peerAddress -> {
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).data, peerAddress);
} else if (message instanceof RemoveDataMessage) {
@ -140,11 +139,7 @@ public class P2PDataStorage implements MessageListener {
public void shutDown() {
Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
timer.cancel();
peerGroup.shutDown();
}
timer.cancel();
}
public boolean add(ProtectedData protectedData, @Nullable Address sender) {