diff --git a/network/src/main/java/io/bitsquare/p2p/AuthenticationException.java b/network/src/main/java/io/bitsquare/p2p/AuthenticationException.java
index 84135a7ba8..cc20230268 100644
--- a/network/src/main/java/io/bitsquare/p2p/AuthenticationException.java
+++ b/network/src/main/java/io/bitsquare/p2p/AuthenticationException.java
@@ -1,22 +1,9 @@
package io.bitsquare.p2p;
-public class AuthenticationException extends RuntimeException {
- public AuthenticationException() {
- }
+public class AuthenticationException extends Exception {
public AuthenticationException(String message) {
super(message);
}
- public AuthenticationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public AuthenticationException(Throwable cause) {
- super(cause);
- }
-
- public AuthenticationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
}
diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java
index 7fa13dea57..52f805f1b9 100644
--- a/network/src/main/java/io/bitsquare/p2p/P2PService.java
+++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java
@@ -176,7 +176,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// we remove ourselves from the list of seed nodes
seedNodeAddresses.remove(mySeedNodeAddress);
-
+ peerGroup.setIsSeedNode(true);
+ requestDataManager.setIsSeedNode(true);
+
start(listener);
}
@@ -287,7 +289,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
- connection.getPeerAddress().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
+ connection.getPeerAddressOptional().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@@ -336,7 +338,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
- connection.getPeerAddress().ifPresent(peerAddresses ->
+ connection.getPeerAddressOptional().ifPresent(peerAddresses ->
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, peerAddresses)));
} else {
@@ -374,14 +376,18 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)");
- checkAuthentication();
-
- if (!authenticatedPeerAddresses.contains(peerAddress))
- peerGroup.authenticateToDirectMessagePeer(peerAddress,
- () -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
- () -> sendMailMessageListener.onFault());
- else
- doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener);
+ try {
+ checkAuthentication();
+ if (!authenticatedPeerAddresses.contains(peerAddress))
+ peerGroup.authenticateToDirectMessagePeer(peerAddress,
+ () -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
+ () -> sendMailMessageListener.onFault());
+ else
+ doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener);
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
}
private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
@@ -458,17 +464,22 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
- checkAuthentication();
-
- if (authenticatedPeerAddresses.contains(peerAddress)) {
- trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
- } else {
- peerGroup.authenticateToDirectMessagePeer(peerAddress,
- () -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
- () -> {
- log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
- trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
- });
+ try {
+ checkAuthentication();
+ if (authenticatedPeerAddresses.contains(peerAddress)) {
+ trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
+ } else {
+ peerGroup.authenticateToDirectMessagePeer(peerAddress,
+ () -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
+ () -> {
+ log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
+ trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
+ });
+ }
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ //TODO check if boolean return type can avoid throwing an exception
+ throw new RuntimeException(e);
}
}
@@ -513,13 +524,17 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
- checkAuthentication();
try {
+ checkAuthentication();
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
dataStorage.add(protectedMailboxData, networkNode.getAddress());
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ //TODO check if boolean return type can avoid throwing an exception
+ throw new RuntimeException(e);
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
@@ -528,30 +543,36 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
- checkAuthentication();
- if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
- ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
- if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
- ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
- PublicKey receiversPubKey = mailboxData.receiversPubKey;
- checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()),
- "receiversPubKey is not matching with our key. That must not happen.");
- try {
- ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
- expirableMailboxPayload,
- optionalKeyRing.get().getSignatureKeyPair(),
- receiversPubKey);
- dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
- } catch (CryptoException e) {
- log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
- }
+ try {
+ checkAuthentication();
+ if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
+ ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
+ if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
+ ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
+ PublicKey receiversPubKey = mailboxData.receiversPubKey;
+ checkArgument(receiversPubKey.equals(optionalKeyRing.get().getSignatureKeyPair().getPublic()),
+ "receiversPubKey is not matching with our key. That must not happen.");
+ try {
+ ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
+ expirableMailboxPayload,
+ optionalKeyRing.get().getSignatureKeyPair(),
+ receiversPubKey);
+ dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
+ } catch (CryptoException e) {
+ log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
+ }
- mailboxMap.remove(decryptedMsgWithPubKey);
- log.trace("Removed successfully decryptedMsgWithPubKey.");
+ mailboxMap.remove(decryptedMsgWithPubKey);
+ log.trace("Removed successfully decryptedMsgWithPubKey.");
+ }
+ } else {
+ log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
+ "\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
}
- } else {
- log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
- "\ndecryptedMsgWithPubKey={}\nmailboxMap={}", decryptedMsgWithPubKey, mailboxMap);
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ //TODO check if boolean return type can avoid throwing an exception
+ throw new RuntimeException(e);
}
}
@@ -573,13 +594,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private boolean doAddData(ExpirablePayload expirablePayload, boolean rePublish) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
- checkAuthentication();
try {
+ checkAuthentication();
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
if (rePublish)
return dataStorage.rePublish(protectedData, networkNode.getAddress());
else
return dataStorage.add(protectedData, networkNode.getAddress());
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ return false;
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@@ -589,10 +613,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public boolean removeData(ExpirablePayload expirablePayload) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
- checkAuthentication();
try {
+ checkAuthentication();
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, optionalKeyRing.get().getSignatureKeyPair());
return dataStorage.remove(protectedData, networkNode.getAddress());
+ } catch (AuthenticationException e) {
+ log.error(e.getMessage());
+ return false;
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@@ -681,7 +708,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
- private void checkAuthentication() {
+ private void checkAuthentication() throws AuthenticationException {
Log.traceCall();
if (authenticatedPeerAddresses.isEmpty())
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");
diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
index 4750ccce91..c3133f7dbb 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
@@ -193,11 +193,11 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
@Nullable
- public synchronized Address getPeerAddress1() {
+ public synchronized Address getPeerAddress() {
return peerAddressOptional.isPresent() ? peerAddressOptional.get() : null;
}
- public synchronized Optional
getPeerAddress() {
+ public synchronized Optional getPeerAddressOptional() {
return peerAddressOptional;
}
@@ -258,7 +258,7 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
Log.traceCall("sendCloseConnectionMessage");
try {
- sendMessage(new CloseConnectionMessage(peerAddressOptional.isPresent() ? peerAddressOptional.get() : null));
+ sendMessage(new CloseConnectionMessage());
setStopFlags();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -557,8 +557,7 @@ public class Connection implements MessageListener {
sharedSpace.updateLastActivityDate();
if (message instanceof CloseConnectionMessage) {
- log.info("Close connection message received from peer {}",
- ((CloseConnectionMessage) message).peerAddress);
+ log.info("CloseConnectionMessage received on connection {}", sharedSpace.connection);
stopped = true;
sharedSpace.shutDown(false);
} else if (!stopped) {
diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
index 938092b89e..ee79ceeebe 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
@@ -216,7 +216,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public void addSetupListener(SetupListener setupListener) {
Log.traceCall();
- setupListeners.add(setupListener);
+ boolean isNewEntry = setupListeners.add(setupListener);
+ if (!isNewEntry)
+ log.warn("Try to add a setupListener which was already added.");
}
@@ -262,21 +264,24 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public void addConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
- boolean newEntry = connectionListeners.add(connectionListener);
- if (!newEntry)
+ boolean isNewEntry = connectionListeners.add(connectionListener);
+ if (!isNewEntry)
log.warn("Try to add a connectionListener which was already added.\nconnectionListener={}\nconnectionListeners={}"
, connectionListener, connectionListeners);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
- connectionListeners.remove(connectionListener);
+ boolean contained = connectionListeners.remove(connectionListener);
+ if (!contained)
+ log.debug("Try to remove a connectionListener which was never added. " +
+ "That might happen because of async behaviour of CopyOnWriteArraySet");
}
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
- boolean newEntry = messageListeners.add(messageListener);
- if (!newEntry)
+ boolean isNewEntry = messageListeners.add(messageListener);
+ if (!isNewEntry)
log.warn("Try to add a messageListener which was already added.");
}
@@ -284,7 +289,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Log.traceCall();
boolean contained = messageListeners.remove(messageListener);
if (!contained)
- log.warn("Try to remove a messageListener which was never added.");
+ log.debug("Try to remove a messageListener which was never added. " +
+ "That might happen because of async behaviour of CopyOnWriteArraySet");
}
@@ -330,13 +336,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional lookupOutboundConnection(Address peerAddress) {
// Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
- .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
+ .filter(e -> e.getPeerAddressOptional().isPresent() && peerAddress.equals(e.getPeerAddressOptional().get())).findAny();
}
private Optional lookupInboundConnection(Address peerAddress) {
// Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
- .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
+ .filter(e -> e.getPeerAddressOptional().isPresent() && peerAddress.equals(e.getPeerAddressOptional().get())).findAny();
}
abstract protected Socket createSocket(Address peerAddress) throws IOException;
diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
index ccbf87264e..48966afc74 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java
@@ -4,8 +4,6 @@ import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
-import javax.annotation.Nullable;
-
public final class CloseConnectionMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
@@ -13,8 +11,7 @@ public final class CloseConnectionMessage implements Message {
private final int networkId = Version.NETWORK_ID;
public Address peerAddress;
- public CloseConnectionMessage(@Nullable Address peerAddress) {
- this.peerAddress = peerAddress;
+ public CloseConnectionMessage() {
}
@Override
@@ -25,7 +22,6 @@ public final class CloseConnectionMessage implements Message {
@Override
public String toString() {
return "CloseConnectionMessage{" +
- "peerAddress=" + peerAddress +
", networkId=" + networkId +
'}';
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java
index 16ef724788..ca57bb69cb 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationException.java
@@ -1,10 +1,6 @@
package io.bitsquare.p2p.peers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class AuthenticationException extends Exception {
- private static final Logger log = LoggerFactory.getLogger(AuthenticationException.class);
public AuthenticationException(String message) {
super(message);
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
index eeca6bd206..e803bc5abd 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
@@ -54,9 +54,9 @@ public class AuthenticationHandshake implements MessageListener {
Address peerAddress,
Supplier> authenticatedAndReportedPeersSupplier,
BiConsumer, Connection> addReportedPeersConsumer) {
+ Log.traceCall("peerAddress " + peerAddress);
this.authenticatedAndReportedPeersSupplier = authenticatedAndReportedPeersSupplier;
this.addReportedPeersConsumer = addReportedPeersConsumer;
- Log.traceCall("peerAddress " + peerAddress);
this.networkNode = networkNode;
this.myAddress = myAddress;
this.peerAddress = peerAddress;
@@ -122,7 +122,7 @@ public class AuthenticationHandshake implements MessageListener {
// now we add the reported peers to our list
addReportedPeersConsumer.accept(authenticationChallenge.reportedPeers, connection);
} else {
- log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationChallenge, nonce, peerAddress);
+ log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationChallenge={}", nonce, peerAddress, authenticationChallenge);
failed(new Exception("Verification of nonce failed. AuthenticationChallenge=" + authenticationChallenge + " / nonceMap=" + nonce));
}
} else if (message instanceof AuthenticationFinalResponse) {
@@ -137,7 +137,7 @@ public class AuthenticationHandshake implements MessageListener {
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
} else {
- log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", authenticationFinalResponse, nonce, peerAddress);
+ log.warn("Verification of nonce failed. nonce={} / peerAddress={} / authenticationFinalResponse={}", nonce, peerAddress, authenticationFinalResponse);
failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationFinalResponse + " / nonce=" + nonce));
}
} else if (message instanceof AuthenticationRejection) {
@@ -326,10 +326,11 @@ public class AuthenticationHandshake implements MessageListener {
private void shutDown() {
Log.traceCall("peerAddress = " + peerAddress);
- networkNode.removeMessageListener(this);
stopped = true;
if (timeoutTimer != null)
timeoutTimer.cancel();
+
+ networkNode.removeMessageListener(this);
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
index e05aecb252..c769b08faa 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
@@ -51,7 +51,7 @@ public class MaintenanceManager implements MessageListener {
public void shutDown() {
Log.traceCall();
- if (sendPingTimer != null)
+ if (sendPingTimer != null)
sendPingTimer.cancel();
networkNode.removeMessageListener(this);
@@ -77,11 +77,11 @@ public class MaintenanceManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
- connection.getPeerAddress().ifPresent(peerAddress -> removePeerConsumer.accept(peerAddress));
+ connection.getPeerAddressOptional().ifPresent(peerAddress -> removePeerConsumer.accept(peerAddress));
}
});
} else if (message instanceof PongMessage) {
- connection.getPeerAddress().ifPresent(peerAddress -> {
+ connection.getPeerAddressOptional().ifPresent(peerAddress -> {
Peer peer = authenticatedPeersSupplier.get().get(peerAddress);
if (peer != null) {
if (((PongMessage) message).nonce != peer.pingNonce) {
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
index b7a51b4159..512a7ec0e5 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
@@ -42,7 +42,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
static {
- setMaxConnectionsLowPriority(18);
+ setMaxConnectionsLowPriority(10);
}
private static final int MAX_REPORTED_PEERS = 1000;
@@ -62,6 +62,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final Map authenticationHandshakes = new HashMap<>();
private final List remainingSeedNodes = new ArrayList<>();
private Optional> seedNodeAddressesOptional = Optional.empty();
+ private Timer connectToSeedNodeTimer;
+ private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -84,6 +86,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
() -> getAuthenticatedPeers(),
address -> removePeer(address),
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection));
+
+ startConnectToSeedNodeTimer();
+ }
+
+ public void setIsSeedNode(boolean isSeedNode) {
+ this.isSeedNode = isSeedNode;
}
@@ -99,12 +107,19 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
- connection.getPeerAddress().ifPresent(peerAddress -> {
+ connection.getPeerAddressOptional().ifPresent(peerAddress -> {
// We only remove the peer from the authenticationHandshakes and the reportedPeers
// if we are not in the authentication process
// Connection shut down is an expected step in the authentication process.
- if (!authenticationHandshakes.containsKey(peerAddress))
+ // If the disconnect happens on an authenticated peer we remove the peer.
+ if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress)) {
removePeer(peerAddress);
+
+ log.info("We got a disconnect. " +
+ "We will try again after a random pause to remaining reported peers.");
+ UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
+ 10, 20, TimeUnit.SECONDS);
+ }
});
}
@@ -172,6 +187,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this);
+
+ if (connectToSeedNodeTimer != null)
+ connectToSeedNodeTimer.cancel();
}
public void addAuthenticationListener(AuthenticationListener listener) {
@@ -258,45 +276,53 @@ public class PeerGroup implements MessageListener, ConnectionListener {
public void authenticateToSeedNode(Address peerAddress, Set seedNodeAddresses) {
Log.traceCall();
seedNodeAddressesOptional = Optional.of(seedNodeAddresses);
- remainingSeedNodes.addAll(seedNodeAddresses);
remainingSeedNodes.remove(peerAddress);
+ remainingSeedNodes.addAll(seedNodeAddresses);
authenticateToFirstSeedNode(peerAddress);
}
private void authenticateToFirstSeedNode(Address peerAddress) {
Log.traceCall();
- if (!maxConnectionsForAuthReached()) {
+ if (!enoughConnectionsForAuthReached()) {
- log.info("We try to authenticate to seed node {}.", peerAddress);
- authenticate(peerAddress, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.info("We got our first seed node authenticated. " +
- "We try if there are reported peers available to authenticate.");
-
- addAuthenticatedPeer(connection, peerAddress);
- authenticateToRemainingReportedPeer();
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." +
- "\nThat is expected if seed nodes are offline." +
- "\nException:" + throwable.toString());
-
- handleAuthenticationFailure(peerAddress, throwable);
-
- Optional seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
- if (seedNodeOptional.isPresent()) {
- log.info("We try another random seed node for first authentication attempt.");
- authenticateToFirstSeedNode(seedNodeOptional.get());
- } else {
- log.info("There are no seed nodes available for authentication. " +
+ if (!authenticationHandshakes.containsKey(peerAddress)) {
+ log.info("We try to authenticate to seed node {}.", peerAddress);
+ authenticate(peerAddress, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate.");
+
+ addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer();
}
- }
- });
+
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." +
+ "\nThat is expected if seed nodes are offline." +
+ "\nException:" + throwable.toString());
+
+ handleAuthenticationFailure(peerAddress, throwable);
+
+ Optional seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
+ if (seedNodeOptional.isPresent()) {
+ log.info("We try another random seed node for first authentication attempt.");
+ authenticateToFirstSeedNode(seedNodeOptional.get());
+ } else {
+ log.info("There are no seed nodes available for authentication. " +
+ "We try if there are reported peers available to authenticate.");
+ authenticateToRemainingReportedPeer();
+ }
+ }
+ });
+ } else {
+ log.warn("We got the first seed node already in the authenticationHandshakes. " +
+ "That might happen when we received an AuthenticationRequest before we start authenticating. " +
+ "We will try after a random pause to authenticate to the reported peers.");
+ UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
+ 20, 30, TimeUnit.SECONDS);
+ }
} else {
log.info("We have already enough connections.");
}
@@ -304,7 +330,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void authenticateToRemainingSeedNode() {
Log.traceCall();
- if (!maxConnectionsForAuthReached()) {
+ if (!enoughConnectionsForAuthReached()) {
Optional seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) {
Address peerAddress = seedNodeOptional.get();
@@ -333,26 +359,27 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
);
- } else if (reportedPeersAvailable()) {
+ } else if (reportedPeersAvailable() && !isSeedNode) {
authenticateToRemainingReportedPeer();
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We try again after a random pause with the seed nodes which failed or if " +
"none available with the reported peers.");
if (seedNodeAddressesOptional.isPresent()) {
- remainingSeedNodes.clear();
- seedNodeAddressesOptional.get().stream()
- .filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
- .forEach(e -> remainingSeedNodes.add(e));
- if (!remainingSeedNodes.isEmpty())
- UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
- 30, 60, TimeUnit.SECONDS);
- else
+ resetRemainingSeedNodes();
+ if (remainingSeedNodes.isEmpty() && !isSeedNode) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
- } else {
+ } else {
+ UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
+ 30, 60, TimeUnit.SECONDS);
+ }
+ } else if (!isSeedNode) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
+ } else {
+ UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
+ 30, 60, TimeUnit.SECONDS);
}
}
} else {
@@ -360,13 +387,44 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
+ private void resetRemainingSeedNodes() {
+ if (seedNodeAddressesOptional.isPresent()) {
+ remainingSeedNodes.clear();
+ seedNodeAddressesOptional.get().stream()
+ .filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
+ .forEach(e -> remainingSeedNodes.add(e));
+ }
+ }
+
+ // We want to stay connected to at least one seed node from time to time to avoid to get isolated with a group of peers
+ private void startConnectToSeedNodeTimer() {
+ Log.traceCall();
+ if (connectToSeedNodeTimer != null)
+ connectToSeedNodeTimer.cancel();
+
+ connectToSeedNodeTimer = UserThread.runAfterRandomDelay(() -> {
+ connectToSeedNode();
+ startConnectToSeedNodeTimer();
+ }, 1, 2, TimeUnit.MINUTES);
+ }
+
+ private void connectToSeedNode() {
+ // remove enough connections first
+ checkIfConnectedPeersExceeds(MAX_CONNECTIONS_NORMAL_PRIORITY - 3);
+ UserThread.runAfter(() -> {
+ resetRemainingSeedNodes();
+ authenticateToRemainingSeedNode();
+ }, 500, TimeUnit.MILLISECONDS);
+ }
+
+
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to reported peers
///////////////////////////////////////////////////////////////////////////////////////////
private void authenticateToRemainingReportedPeer() {
Log.traceCall();
- if (!maxConnectionsForAuthReached()) {
+ if (!enoughConnectionsForAuthReached()) {
if (reportedPeersAvailable()) {
Optional andRemoveNotAuthenticatingReportedPeer = getAndRemoveNotAuthenticatingReportedPeer();
if (andRemoveNotAuthenticatingReportedPeer.isPresent()) {
@@ -410,6 +468,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
} else if (!remainingSeedNodes.isEmpty()) {
authenticateToRemainingSeedNode();
+ } else if (remainingSeedNodes.isEmpty()) {
+ UserThread.runAfterRandomDelay(() -> {
+ resetRemainingSeedNodes();
+ authenticateToRemainingSeedNode();
+ },
+ 10, 20, TimeUnit.SECONDS);
+
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We will try again after a random pause.");
@@ -526,7 +591,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
removeFromReportedPeers(peerAddress);
- if (!checkIfConnectedPeersExceeds())
+ if (!checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY))
printAuthenticatedPeers();
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
@@ -563,30 +628,35 @@ public class PeerGroup implements MessageListener, ConnectionListener {
printAuthenticatedPeers();
}
- private boolean maxConnectionsForAuthReached() {
- return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
+ private boolean enoughConnectionsForAuthReached() {
+ // We reduce the limit to avoid dangling connect/disconnect
+ return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY - 2;
}
private boolean reportedPeersAvailable() {
return !reportedPeers.isEmpty();
}
- private boolean checkIfConnectedPeersExceeds() {
+ private boolean checkIfConnectedPeersExceeds(int limit) {
Log.traceCall();
int size = authenticatedPeers.size();
- if (size > PeerGroup.MAX_CONNECTIONS_LOW_PRIORITY) {
+ if (size > limit) {
Set allConnections = networkNode.getAllConnections();
int allConnectionsSize = allConnections.size();
- log.info("We have {} connections open. Lets remove the passive connections" +
- " which have not been active recently.", allConnectionsSize);
- if (size != allConnectionsSize) {
+ log.info("We have {} connections open (authenticatedPeers={}). Lets remove the passive connections" +
+ " which have not been active recently.", allConnectionsSize, size);
+ // TODO Investigate inconsistency which between size and allConnectionsSize sometimes.
+ /* if (size != allConnectionsSize) {
log.warn("authenticatedPeers.size()!=allConnections.size(). There is some inconsistency.");
log.debug("authenticatedPeers={}", authenticatedPeers);
log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections());
- }
+ }*/
+
+ // If we are a seed node we don't remove other seed nodes to keep the core network well connected
List authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE)
+ .filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@@ -596,6 +666,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE)
+ .filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
@@ -605,6 +676,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
+ .filter(e -> !isSeedNode || !isAuthConnectionSeedNode(e))
.collect(Collectors.toList());
}
}
@@ -617,7 +689,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Connection connection = authenticatedConnections.remove(0);
log.info("We are going to shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
- connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 10, 50, TimeUnit.MILLISECONDS));
+ connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(limit), 10, 50, TimeUnit.MILLISECONDS));
return true;
} else {
log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)");
@@ -629,6 +701,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
+ private boolean isAuthConnectionSeedNode(Connection e) {
+ return e.getPeerAddressOptional().isPresent() && seedNodeAddressesOptional.isPresent() && seedNodeAddressesOptional.get().contains(e.getPeerAddressOptional().get());
+ }
+
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
@@ -728,6 +804,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Optional reportedPeer = Optional.empty();
List list = new ArrayList<>(reportedPeers);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
+ authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
if (!list.isEmpty())
reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list));
@@ -743,10 +820,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private Optional getAndRemoveNotAuthenticatingSeedNode() {
Log.traceCall();
Optional seedNode = Optional.empty();
- List list = new ArrayList<>(remainingSeedNodes);
- authenticationHandshakes.keySet().stream().forEach(e -> list.remove(e));
- if (!list.isEmpty())
- seedNode = Optional.of(getAndRemoveRandomAddress(list));
+ authenticationHandshakes.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
+ authenticatedPeers.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
+ if (!remainingSeedNodes.isEmpty())
+ seedNode = Optional.of(getAndRemoveRandomAddress(remainingSeedNodes));
return seedNode;
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
index 17e5f47697..e5d583095d 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
@@ -28,6 +28,7 @@ import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener, AuthenticationListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
+
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@@ -46,6 +47,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private Optional optionalConnectedSeedNodeAddress = Optional.empty();
private Optional> optionalSeedNodeAddresses = Optional.empty();
+ private boolean isSeedNode;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -72,6 +74,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// API
///////////////////////////////////////////////////////////////////////////////////////////
+ public void setIsSeedNode(boolean isSeedNode) {
+ this.isSeedNode = isSeedNode;
+ }
+
public void requestData(Collection seedNodeAddresses) {
if (!optionalSeedNodeAddresses.isPresent())
optionalSeedNodeAddresses = Optional.of(seedNodeAddresses);
@@ -141,7 +147,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
HashSet set = dataResponse.set;
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
- connection.getPeerAddress().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
+ connection.getPeerAddressOptional().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress));
}
}
@@ -153,16 +159,23 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
@Override
public void onPeerAuthenticated(Address peerAddress, Connection connection) {
+ if (isSeedNode && dataStorage.getMap().isEmpty()) {
+ // We are the seed node and entering the network we request the data from the peer
+ UserThread.runAfterRandomDelay(()
+ -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS);
+ }
+
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
// We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE)
// We delay a bit to be sure that the authentication state is applied to all threads
- if (connection.getConnectionPriority() == ConnectionPriority.ACTIVE && connectedSeedNodeAddress.equals(peerAddress))
+ if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) {
+ // We are the node (can be a seed node as well) which requested the authentication
UserThread.runAfter(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS);
+ }
});
}
-
// 5. Step after authentication to first seed node we request again the data
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString());
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
index 97f2d53237..e4a0387a27 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
@@ -14,7 +14,6 @@ public final class AuthenticationRejection extends AuthenticationMessage {
@Override
public String toString() {
return "AuthenticationReject{" +
- "address=" + senderAddress +
super.toString() + "} ";
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
index 099fff53cb..21c7a9912a 100644
--- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
+++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
@@ -44,7 +44,6 @@ public class P2PDataStorage implements MessageListener {
private HashMap sequenceNumberMap = new HashMap<>();
private final Storage storage;
private final Timer timer = new Timer();
- private volatile boolean shutDownInProgress;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -116,7 +115,7 @@ public class P2PDataStorage implements MessageListener {
Log.traceCall(message.toString());
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
- connection.getPeerAddress().ifPresent(peerAddress -> {
+ connection.getPeerAddressOptional().ifPresent(peerAddress -> {
if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).data, peerAddress);
} else if (message instanceof RemoveDataMessage) {
@@ -140,11 +139,7 @@ public class P2PDataStorage implements MessageListener {
public void shutDown() {
Log.traceCall();
- if (!shutDownInProgress) {
- shutDownInProgress = true;
- timer.cancel();
- peerGroup.shutDown();
- }
+ timer.cancel();
}
public boolean add(ProtectedData protectedData, @Nullable Address sender) {