diff --git a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java
index bb1115ec52..0bd7e0dff0 100644
--- a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java
+++ b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java
@@ -134,13 +134,15 @@ public class OfferAvailabilityProtocol {
}
private void startTimeout() {
- stopTimeout();
-
- timeoutTimer = UserThread.runAfter(() -> {
- log.warn("Timeout reached");
- model.offer.setState(Offer.State.OFFERER_OFFLINE);
- errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
- }, TIMEOUT_SEC);
+ if (timeoutTimer == null) {
+ timeoutTimer = UserThread.runAfter(() -> {
+ log.warn("Timeout reached at " + this);
+ model.offer.setState(Offer.State.OFFERER_OFFLINE);
+ errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
+ }, TIMEOUT_SEC);
+ } else {
+ log.warn("timeoutTimer already created. That must not happen.");
+ }
}
private void stopTimeout() {
diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml
index 7d31c43261..eefcb5963f 100644
--- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml
+++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml
@@ -38,8 +38,7 @@
-
+ GridPane.vgrow="ALWAYS" editable="false" focusTraversable="false"/>
@@ -64,7 +63,7 @@
+ GridPane.vgrow="ALWAYS" editable="false" focusTraversable="false"/>
diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java
index f60ecd8b6f..cfa9853e2e 100644
--- a/network/src/main/java/io/bitsquare/p2p/P2PService.java
+++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java
@@ -110,12 +110,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall();
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
- Set nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections();
- Set allConfirmedConnections = networkNode.getConfirmedConnections();
- log.info("nodeAddressesOfConfirmedConnections=" + nodeAddressesOfConfirmedConnections);
- log.info("allConfirmedConnections=" + allConfirmedConnections);
- log.info("Nr of connections: {} / {} (nodeAddressesOfConfirmedConnections / allConfirmedConnections)", nodeAddressesOfConfirmedConnections.size(), allConfirmedConnections.size());
- UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfConfirmedConnections.size()));
+ UserThread.execute(() -> numConnectedPeers.set(networkNode.getNodeAddressesOfConfirmedConnections().size()));
};
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
@@ -265,7 +260,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
"seedNodeOfPreliminaryDataRequest must be present");
- peerExchangeManager.requestReportedPeers(seedNodeOfPreliminaryDataRequest.get());
+ peerExchangeManager.requestReportedPeersFromSeedNodes(seedNodeOfPreliminaryDataRequest.get());
isBootstrapped = true;
p2pServiceListeners.stream().forEach(P2PServiceListener::onBootstrapComplete);
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java
new file mode 100644
index 0000000000..816b37794e
--- /dev/null
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java
@@ -0,0 +1,208 @@
+package io.bitsquare.p2p.peers;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import io.bitsquare.app.Log;
+import io.bitsquare.common.UserThread;
+import io.bitsquare.p2p.Message;
+import io.bitsquare.p2p.NodeAddress;
+import io.bitsquare.p2p.network.Connection;
+import io.bitsquare.p2p.network.MessageListener;
+import io.bitsquare.p2p.network.NetworkNode;
+import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
+import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class PeerExchangeHandshake implements MessageListener {
+ private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandshake.class);
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Listener
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ public interface Listener {
+ void onComplete();
+
+ void onFault(String errorMessage);
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Class fields
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ private final NetworkNode networkNode;
+ private final PeerManager peerManager;
+ private final Listener listener;
+ private final long nonce = new Random().nextLong();
+ private Timer timeoutTimer;
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Constructor
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ public PeerExchangeHandshake(NetworkNode networkNode, PeerManager peerManager, Listener listener) {
+ this.networkNode = networkNode;
+ this.peerManager = peerManager;
+ this.listener = listener;
+
+ networkNode.addMessageListener(this);
+ }
+
+ public void shutDown() {
+ Log.traceCall();
+ networkNode.removeMessageListener(this);
+ stopTimeoutTimer();
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // API
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ public void requestReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) {
+ Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
+ checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
+ checkArgument(timeoutTimer == null, "requestData must not be called twice.");
+
+ timeoutTimer = UserThread.runAfter(() -> {
+ log.info("timeoutTimer called");
+ peerManager.shutDownConnection(nodeAddress);
+ shutDown();
+ listener.onFault("A timeout occurred");
+ },
+ 20, TimeUnit.SECONDS);
+
+ GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce,
+ getReportedPeers(nodeAddress));
+ SettableFuture future = networkNode.sendMessage(nodeAddress,
+ getPeersRequest);
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
+ }
+
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ String errorMessage = "Sending getPeersRequest to " + nodeAddress +
+ " failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." +
+ "Exception: " + throwable.getMessage();
+ log.info(errorMessage);
+
+ peerManager.shutDownConnection(nodeAddress);
+ shutDown();
+ listener.onFault(errorMessage);
+ }
+ });
+ }
+
+ public void onGetPeersRequest(GetPeersRequest message, final Connection connection) {
+ checkArgument(timeoutTimer == null, "requestData must not be called twice.");
+
+ timeoutTimer = UserThread.runAfter(() -> {
+ log.info("timeoutTimer called");
+ peerManager.shutDownConnection(connection);
+ shutDown();
+ listener.onFault("A timeout occurred");
+ },
+ 20, TimeUnit.SECONDS);
+
+ GetPeersRequest getPeersRequest = message;
+ HashSet reportedPeers = getPeersRequest.reportedPeers;
+ StringBuilder result = new StringBuilder("Received peers:");
+ reportedPeers.stream().forEach(e -> result.append("\n").append(e));
+ log.trace(result.toString());
+
+ checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
+ "The peers address must have been already set at the moment");
+ SettableFuture future = networkNode.sendMessage(connection,
+ new GetPeersResponse(getPeersRequest.nonce,
+ getReportedPeers(connection.getPeersNodeAddressOptional().get())));
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.trace("GetPeersResponse sent successfully");
+ shutDown();
+ listener.onComplete();
+ }
+
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ String errorMessage = "Sending getPeersRequest to " + connection +
+ " failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." +
+ "Exception: " + throwable.getMessage();
+ log.info(errorMessage);
+
+ peerManager.shutDownConnection(connection);
+ shutDown();
+ listener.onFault(errorMessage);
+ }
+ });
+ peerManager.addToReportedPeers(reportedPeers, connection);
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // MessageListener implementation
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public void onMessage(Message message, Connection connection) {
+ if (message instanceof GetPeersResponse) {
+ Log.traceCall(message.toString() + " / connection=" + connection);
+ GetPeersResponse getPeersResponse = (GetPeersResponse) message;
+ if (getPeersResponse.requestNonce == nonce) {
+ stopTimeoutTimer();
+
+ HashSet reportedPeers = getPeersResponse.reportedPeers;
+ StringBuilder result = new StringBuilder("Received peers:");
+ reportedPeers.stream().forEach(e -> result.append("\n").append(e));
+ log.trace(result.toString());
+ peerManager.addToReportedPeers(reportedPeers, connection);
+
+ shutDown();
+ listener.onComplete();
+ } else {
+ log.debug("Nonce not matching. That happens if we get a response after a canceled handshake " +
+ "(timeout). We drop that message. nonce={} / requestNonce={}",
+ nonce, getPeersResponse.requestNonce);
+ }
+ }
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Private
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+ private HashSet getReportedPeers(NodeAddress receiverNodeAddress) {
+ return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
+ .filter(e -> !peerManager.isSeedNode(e) &&
+ !peerManager.isSelf(e) &&
+ !e.nodeAddress.equals(receiverNodeAddress)
+ )
+ .collect(Collectors.toSet()));
+ }
+
+ private void stopTimeoutTimer() {
+ if (timeoutTimer != null) {
+ timeoutTimer.cancel();
+ timeoutTimer = null;
+ }
+ }
+}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
index d890fb00dc..3a3851ae61 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
@@ -1,9 +1,6 @@
package io.bitsquare.p2p.peers;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
@@ -11,9 +8,6 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
-import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
-import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +26,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager;
private final Set seedNodeAddresses;
private final ScheduledThreadPoolExecutor executor;
- private Timer connectToMorePeersTimer, timeoutTimer, maintainConnectionsTimer;
+ private final Map peerExchangeHandshakeMap = new HashMap<>();
+ private Timer connectToMorePeersTimer, maintainConnectionsTimer;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -55,7 +50,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
networkNode.removeMessageListener(this);
stopConnectToMorePeersTimer();
stopMaintainConnectionsTimer();
- stopTimeoutTimer();
+ peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::shutDown);
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
}
@@ -64,12 +59,13 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
// API
///////////////////////////////////////////////////////////////////////////////////////////
- public void requestReportedPeers(NodeAddress nodeAddress) {
+ public void requestReportedPeersFromSeedNodes(NodeAddress nodeAddress) {
+ checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
ArrayList remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
remainingNodeAddresses.remove(nodeAddress);
requestReportedPeers(nodeAddress, remainingNodeAddresses);
- long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
+ int delay = new Random().nextInt(60) + 60 * 4; // 4-5 min
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
delay, delay, TimeUnit.SECONDS);
}
@@ -87,11 +83,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public void onDisconnect(Reason reason, Connection connection) {
// We use a timer to throttle if we get a series of disconnects
// The more connections we have the more relaxed we are with a checkConnections
- if (maintainConnectionsTimer == null)
- maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
- networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS);
-
-
+ stopMaintainConnectionsTimer();
+ int size = networkNode.getAllConnections().size();
+ int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min)
+ maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
+ delay, TimeUnit.SECONDS);
}
@Override
@@ -105,45 +101,23 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
@Override
public void onMessage(Message message, Connection connection) {
- if (message instanceof PeerExchangeMessage) {
- Log.traceCall(message.toString());
- if (message instanceof GetPeersRequest) {
- HashSet reportedPeers = ((GetPeersRequest) message).reportedPeers;
+ if (message instanceof GetPeersRequest) {
+ PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
+ peerManager,
+ new PeerExchangeHandshake.Listener() {
+ @Override
+ public void onComplete() {
+ log.trace("PeerExchangeHandshake of inbound connection complete. Connection= {}",
+ connection);
+ }
- StringBuilder result = new StringBuilder("Received peers:");
- reportedPeers.stream().forEach(e -> result.append("\n").append(e));
- log.trace(result.toString());
-
- checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
- "The peers address must have been already set at the moment");
- SettableFuture future = networkNode.sendMessage(connection,
- new GetPeersResponse(getReportedPeersHashSet(connection.getPeersNodeAddressOptional().get())));
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.trace("GetPeersResponse sent successfully");
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("GetPeersResponse sending failed " + throwable.getMessage() +
- " Maybe the peer went offline.");
- }
- });
- peerManager.addToReportedPeers(reportedPeers, connection);
- } else if (message instanceof GetPeersResponse) {
- stopTimeoutTimer();
-
- HashSet reportedPeers = ((GetPeersResponse) message).reportedPeers;
-
- StringBuilder result = new StringBuilder("Received peers:");
- reportedPeers.stream().forEach(e -> result.append("\n").append(e));
- log.trace(result.toString());
-
- peerManager.addToReportedPeers(reportedPeers, connection);
-
- connectToMorePeers();
- }
+ @Override
+ public void onFault(String errorMessage) {
+ log.trace("PeerExchangeHandshake of outbound connection failed. {} connection= {}",
+ errorMessage, connection);
+ }
+ });
+ peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection);
}
}
@@ -154,82 +128,44 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private void requestReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
- checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
+ if (!peerExchangeHandshakeMap.containsKey(nodeAddress)) {
+ PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
+ peerManager,
+ new PeerExchangeHandshake.Listener() {
+ @Override
+ public void onComplete() {
+ log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress= {}",
+ nodeAddress);
+ peerExchangeHandshakeMap.remove(nodeAddress);
+ connectToMorePeers();
+ }
- stopConnectToMorePeersTimer();
- stopTimeoutTimer();
+ @Override
+ public void onFault(String errorMessage) {
+ log.trace("PeerExchangeHandshake of outbound connection failed. {} nodeAddress= {}",
+ errorMessage, nodeAddress);
- timeoutTimer = UserThread.runAfter(() -> {
- log.info("timeoutTimer called");
- handleError(nodeAddress, remainingNodeAddresses);
- },
- 20, TimeUnit.SECONDS);
-
- SettableFuture future = networkNode.sendMessage(nodeAddress,
- new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.trace("GetPeersRequest sent successfully");
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Sending GetPeersRequest to " + nodeAddress + " failed. " +
- "That is expected if the peer is offline. " +
- "Exception:" + throwable.getMessage());
- handleError(nodeAddress, remainingNodeAddresses);
- }
- });
- }
-
- private void connectToMorePeers() {
- Log.traceCall();
-
- stopConnectToMorePeersTimer();
-
- if (!peerManager.hasSufficientConnections()) {
- // We want to keep it sorted but avoid duplicates
- List list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
- list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
- list.addAll(seedNodeAddresses.stream()
- .filter(e -> !list.contains(e) &&
- !peerManager.isSelf(e) &&
- !peerManager.isConfirmed(e))
- .collect(Collectors.toSet()));
- log.trace("Sorted and filtered list: list=" + list);
- if (!list.isEmpty()) {
- NodeAddress nextCandidate = list.get(0);
- list.remove(nextCandidate);
- requestReportedPeers(nextCandidate, list);
- } else {
- log.info("No more peers are available for requestReportedPeers.");
- }
+ peerExchangeHandshakeMap.remove(nodeAddress);
+ if (!remainingNodeAddresses.isEmpty()) {
+ log.info("There are remaining nodes available for requesting peers. " +
+ "We will try getReportedPeers again.");
+ requestReportedPeersFromRandomPeer(remainingNodeAddresses);
+ } else {
+ log.info("There is no remaining node available for requesting peers. " +
+ "That is expected if no other node is online.\n" +
+ "We will try again after a random pause.");
+ if (connectToMorePeersTimer == null)
+ connectToMorePeersTimer = UserThread.runAfterRandomDelay(
+ PeerExchangeManager.this::connectToMorePeers, 20, 30);
+ }
+ }
+ });
+ peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake);
+ peerExchangeHandshake.requestReportedPeers(nodeAddress, remainingNodeAddresses);
} else {
- log.info("We have already sufficient connections.");
- }
- }
-
- private void handleError(NodeAddress peersNodeAddress, List remainingNodeAddresses) {
- Log.traceCall("peersNodeAddress=" + peersNodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
-
- stopTimeoutTimer();
-
- // In case a shutdown was not triggered already by the error we close that connection
- // if it is not a DIRECT_MSG_PEER
- peerManager.shutDownConnection(peersNodeAddress);
-
- if (!remainingNodeAddresses.isEmpty()) {
- log.info("There are remaining nodes available for requesting peers. " +
- "We will try getReportedPeers again.");
- requestReportedPeersFromRandomPeer(remainingNodeAddresses);
- } else {
- log.info("There is no remaining node available for requesting peers. " +
- "That is expected if no other node is online.\n" +
- "We will try to use reported peers (if no available we use persisted peers) " +
- "and try again to request peers from our seed nodes after a random pause.");
- if (connectToMorePeersTimer == null)
- connectToMorePeersTimer = UserThread.runAfterRandomDelay(this::connectToMorePeers, 20, 30);
+ log.trace("We have started already a peerExchangeHandshake to peer. " +
+ "That can happen by the timers calls. We ignore that call. " +
+ "nodeAddress=" + nodeAddress);
}
}
@@ -256,20 +192,48 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
connectToMorePeers();
}
- // Use all outbound connections for updating reported peers and make sure we keep the connection alive
+ // Use all outbound connections older than 5 min. for updating reported peers and make sure we keep the connection alive
// Inbound connections should be maintained be the requesting peer
confirmedConnections.stream()
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
- c instanceof OutboundConnection).
- forEach(c -> UserThread.runAfterRandomDelay(() ->
- requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>())
+ c instanceof OutboundConnection &&
+ new Date().getTime() - c.getLastActivityDate().getTime() > 5 * 60 * 1000)
+ .forEach(c -> UserThread.runAfterRandomDelay(() -> {
+ log.trace("Call requestReportedPeers from maintainConnections");
+ requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>());
+ }
, 3, 5));
}
+ private void connectToMorePeers() {
+ Log.traceCall();
- ///////////////////////////////////////////////////////////////////////////////////////////
- // Utils
- ///////////////////////////////////////////////////////////////////////////////////////////
+ stopConnectToMorePeersTimer();
+
+ if (!peerManager.hasSufficientConnections()) {
+ // We create a new list of not connected candidates
+ // 1. reported sorted by most recent lastActivityDate
+ // 2. persisted sorted by most recent lastActivityDate
+ // 3. seenNodes
+ List list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
+ list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
+ list.addAll(seedNodeAddresses.stream()
+ .filter(e -> !list.contains(e) &&
+ !peerManager.isSelf(e) &&
+ !peerManager.isConfirmed(e))
+ .collect(Collectors.toSet()));
+ log.trace("Sorted and filtered list: list=" + list);
+ if (!list.isEmpty()) {
+ NodeAddress nextCandidate = list.get(0);
+ list.remove(nextCandidate);
+ requestReportedPeers(nextCandidate, list);
+ } else {
+ log.info("No more peers are available for requestReportedPeers.");
+ }
+ } else {
+ log.info("We have already sufficient connections.");
+ }
+ }
// sorted by most recent lastActivityDate
private List getFilteredAndSortedList(Set set, List list) {
@@ -291,15 +255,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
requestReportedPeers(nextCandidate, remainingNodeAddresses);
}
- private HashSet getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
- return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
- .filter(e -> !peerManager.isSeedNode(e) &&
- !peerManager.isSelf(e) &&
- !e.nodeAddress.equals(receiverNodeAddress)
- )
- .collect(Collectors.toSet()));
- }
-
private void stopConnectToMorePeersTimer() {
if (connectToMorePeersTimer != null) {
connectToMorePeersTimer.cancel();
@@ -313,11 +268,4 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
maintainConnectionsTimer = null;
}
}
-
- private void stopTimeoutTimer() {
- if (timeoutTimer != null) {
- timeoutTimer.cancel();
- timeoutTimer = null;
- }
- }
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java
index 9a1e76aa7e..21c8a7e0e3 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java
@@ -178,9 +178,8 @@ public class PeerManager implements ConnectionListener, MessageListener {
"MAX_CONNECTIONS_HIGH_PRIORITY limit of {}", MAX_CONNECTIONS_EXTENDED_2);
if (size > MAX_CONNECTIONS_EXTENDED_2) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
- // All expect DIRECT_MSG_PEER type connections
+ // All connections
candidates = allConnections.stream()
- .filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.collect(Collectors.toList());
}
}
@@ -436,10 +435,11 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
private void printConnectedPeers() {
- if (!networkNode.getNodeAddressesOfConfirmedConnections().isEmpty()) {
+ if (!networkNode.getConfirmedConnections().isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Connected peers for node " + networkNode.getNodeAddress() + ":");
- networkNode.getNodeAddressesOfConfirmedConnections().stream().forEach(e -> result.append("\n").append(e));
+ networkNode.getConfirmedConnections().stream().forEach(e -> result.append("\n")
+ .append(e.getPeersNodeAddressOptional().get()).append(" ").append(e.getPeerType()));
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java
index b12f1154eb..ee71870aff 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java
@@ -50,7 +50,7 @@ public class RequestDataHandshake implements MessageListener {
private final PeerManager peerManager;
private final Listener listener;
private Timer timeoutTimer;
- private long requestNonce;
+ private final long nonce = new Random().nextLong();
///////////////////////////////////////////////////////////////////////////////////////////
@@ -69,22 +69,19 @@ public class RequestDataHandshake implements MessageListener {
public void shutDown() {
Log.traceCall();
-
networkNode.removeMessageListener(this);
-
stopTimeoutTimer();
}
+
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
-
- stopTimeoutTimer();
-
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
+
timeoutTimer = UserThread.runAfter(() -> {
log.info("timeoutTimer called");
peerManager.shutDownConnection(nodeAddress);
@@ -94,11 +91,10 @@ public class RequestDataHandshake implements MessageListener {
10, TimeUnit.SECONDS);
Message dataRequest;
- requestNonce = new Random().nextLong();
if (networkNode.getNodeAddress() == null)
- dataRequest = new PreliminaryDataRequest(requestNonce);
+ dataRequest = new PreliminaryDataRequest(nonce);
else
- dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), requestNonce);
+ dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), nonce);
log.info("We send a {} to peer {}. ", dataRequest.getClass().getSimpleName(), nodeAddress);
@@ -106,14 +102,14 @@ public class RequestDataHandshake implements MessageListener {
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(@Nullable Connection connection) {
- log.trace("Send DataRequest to " + nodeAddress + " succeeded.");
+ log.trace("Send " + dataRequest + " to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
- String errorMessage = "Sending " + dataRequest.getClass().getSimpleName() + " to " + nodeAddress +
- " failed. That is expected if the peer is offline. " +
- "Exception:" + throwable.getMessage();
+ String errorMessage = "Sending dataRequest to " + nodeAddress +
+ " failed. That is expected if the peer is offline. dataRequest=" + dataRequest + "." +
+ "Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress);
@@ -123,43 +119,54 @@ public class RequestDataHandshake implements MessageListener {
});
}
+ public void onDataRequest(Message message, final Connection connection) {
+ Log.traceCall(message.toString() + " / connection=" + connection);
+
+ checkArgument(timeoutTimer == null, "requestData must not be called twice.");
+ timeoutTimer = UserThread.runAfter(() -> {
+ log.info("timeoutTimer called");
+ peerManager.shutDownConnection(connection);
+ shutDown();
+ listener.onFault("A timeout occurred");
+ },
+ 10, TimeUnit.SECONDS);
+
+ DataRequest dataRequest = (DataRequest) message;
+ DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce());
+ SettableFuture future = networkNode.sendMessage(connection, dataResponse);
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.trace("Send DataResponse to {} succeeded. dataResponse={}",
+ connection.getPeersNodeAddressOptional(), dataResponse);
+ shutDown();
+ listener.onComplete();
+ }
+
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ String errorMessage = "Sending dataRequest to " + connection +
+ " failed. That is expected if the peer is offline. dataRequest=" + dataRequest + "." +
+ "Exception: " + throwable.getMessage();
+ log.info(errorMessage);
+
+ peerManager.shutDownConnection(connection);
+ shutDown();
+ listener.onFault(errorMessage);
+ }
+ });
+ }
+
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
- if (message instanceof DataRequest) {
- Log.traceCall(message.toString());
- DataRequest dataRequest = (DataRequest) message;
- DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce());
- SettableFuture future = networkNode.sendMessage(connection, dataResponse);
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.trace("Send DataResponse to {} succeeded. dataResponse={}",
- connection.getPeersNodeAddressOptional(), dataResponse);
- shutDown();
- listener.onComplete();
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- String errorMessage = "Send DataResponse to " + connection.getPeersNodeAddressOptional() + " failed. " +
- "That is expected if the peer went offline. " +
- "Exception:" + throwable.getMessage();
- log.info(errorMessage);
-
- peerManager.shutDownConnection(connection);
- shutDown();
- listener.onFault(errorMessage);
- }
- });
- } else if (message instanceof DataResponse) {
+ if (message instanceof DataResponse) {
+ Log.traceCall(message.toString() + " / connection=" + connection);
DataResponse dataResponse = (DataResponse) message;
- if (dataResponse.requestNonce == requestNonce) {
- Log.traceCall(message.toString());
-
+ if (dataResponse.requestNonce == nonce) {
stopTimeoutTimer();
// connection.getPeersNodeAddressOptional() is not present at the first call
@@ -168,12 +175,21 @@ public class RequestDataHandshake implements MessageListener {
((DataResponse) message).dataSet.stream()
.forEach(e -> dataStorage.add(e, peersNodeAddress));
});
+
shutDown();
listener.onComplete();
+ } else {
+ log.debug("Nonce not matching. That happens if we get a response after a canceled handshake " +
+ "(timeout). We drop that message. nonce={} / requestNonce={}",
+ nonce, dataResponse.requestNonce);
}
}
}
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ // Private
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.cancel();
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
index 86203a2826..dd68552603 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
@@ -49,7 +49,6 @@ public class RequestDataManager implements MessageListener {
private final Listener listener;
private final Map requestDataHandshakeMap = new HashMap<>();
-
private Optional nodeOfPreliminaryDataRequest = Optional.empty();
private Timer requestDataTimer;
private boolean dataUpdateRequested;
@@ -64,20 +63,17 @@ public class RequestDataManager implements MessageListener {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.peerManager = peerManager;
- checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
this.listener = listener;
+ checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
networkNode.addMessageListener(this);
}
public void shutDown() {
Log.traceCall();
-
stopRequestDataTimer();
-
networkNode.removeMessageListener(this);
-
requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
}
@@ -88,7 +84,10 @@ public class RequestDataManager implements MessageListener {
public void requestPreliminaryData() {
Log.traceCall();
- requestDataFromRandomPeer(new ArrayList<>(seedNodeAddresses));
+ ArrayList nodeAddresses = new ArrayList<>(seedNodeAddresses);
+ NodeAddress nextCandidate = nodeAddresses.get(0);
+ nodeAddresses.remove(nextCandidate);
+ requestData(nextCandidate, nodeAddresses);
}
public void requestUpdatesData() {
@@ -113,8 +112,6 @@ public class RequestDataManager implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof DataRequest) {
- Log.traceCall(message.toString());
- log.trace("Received {} at {}", message.getClass().getSimpleName(), connection);
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
new RequestDataHandshake.Listener() {
@Override
@@ -125,11 +122,11 @@ public class RequestDataManager implements MessageListener {
@Override
public void onFault(String errorMessage) {
- log.info("RequestDataHandshake of inbound connection failed. Connection= {}",
+ log.trace("RequestDataHandshake of inbound connection failed. {} Connection= {}",
errorMessage, connection);
}
});
- requestDataHandshake.onMessage(message, connection);
+ requestDataHandshake.onDataRequest(message, connection);
}
}
@@ -138,21 +135,15 @@ public class RequestDataManager implements MessageListener {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
- private void requestDataFromRandomPeer(List nodeAddresses) {
- Log.traceCall("remainingNodeAddresses=" + nodeAddresses);
- NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
- nodeAddresses.remove(nextCandidate);
- requestData(nextCandidate, nodeAddresses);
- }
-
private void requestData(NodeAddress nodeAddress, List remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
-
if (!requestDataHandshakeMap.containsKey(nodeAddress)) {
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
new RequestDataHandshake.Listener() {
@Override
public void onComplete() {
+ log.trace("RequestDataHandshake of outbound connection complete. nodeAddress= {}",
+ nodeAddress);
stopRequestDataTimer();
// need to remove before listeners are notified as they cause the update call
@@ -175,10 +166,14 @@ public class RequestDataManager implements MessageListener {
@Override
public void onFault(String errorMessage) {
+ log.trace("RequestDataHandshake of outbound connection failed. {} nodeAddress= {}",
+ errorMessage, nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
- requestDataFromRandomPeer(remainingNodeAddresses);
+ NodeAddress nextCandidate = remainingNodeAddresses.get(0);
+ remainingNodeAddresses.remove(nextCandidate);
+ requestData(nextCandidate, remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n" +
@@ -219,7 +214,7 @@ public class RequestDataManager implements MessageListener {
requestDataHandshakeMap.put(nodeAddress, requestDataHandshake);
requestDataHandshake.requestData(nodeAddress);
} else {
- log.warn("We have started already a DataRequest request to peer. " + nodeAddress);
+ log.warn("We have started already a requestDataHandshake to peer. " + nodeAddress);
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
index 667eedc7b9..31dfc322d4 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
@@ -12,10 +12,12 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final NodeAddress senderNodeAddress;
+ public long nonce;
public final HashSet reportedPeers;
- public GetPeersRequest(NodeAddress senderNodeAddress, HashSet reportedPeers) {
+ public GetPeersRequest(NodeAddress senderNodeAddress, long nonce, HashSet reportedPeers) {
this.senderNodeAddress = senderNodeAddress;
+ this.nonce = nonce;
this.reportedPeers = reportedPeers;
}
@@ -28,6 +30,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
public String toString() {
return "GetPeersRequest{" +
"senderNodeAddress=" + senderNodeAddress +
+ ", requestNonce=" + nonce +
", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
index bd5d3e5bca..79d56a589d 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
@@ -9,16 +9,19 @@ public final class GetPeersResponse extends PeerExchangeMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
+ public final long requestNonce;
public final HashSet reportedPeers;
- public GetPeersResponse(HashSet reportedPeers) {
+ public GetPeersResponse(long requestNonce, HashSet reportedPeers) {
+ this.requestNonce = requestNonce;
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersResponse{" +
- "reportedPeers=" + reportedPeers +
+ "requestNonce=" + requestNonce +
+ ", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
}