From 04b32a35a2dfe0a3519590ec8b1352d899b35a61 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 11 Feb 2016 13:18:20 +0100 Subject: [PATCH] Add maintenance package, use connected peers for reported peers (WIP) --- .../java/io/bitsquare/app/BitsquareApp.java | 2 +- .../p2p/peers/MaintenanceHandshake.java | 220 ++++++++++++++ .../p2p/peers/MaintenanceManager.java | 286 ++++++++++++++++++ .../p2p/peers/PeerExchangeHandshake.java | 25 +- .../p2p/peers/PeerExchangeManager.java | 70 +---- .../io/bitsquare/p2p/peers/PeerManager.java | 6 +- .../maintenance/MaintenanceMessage.java | 20 ++ .../messages/maintenance/PingRequest.java | 37 +++ .../messages/maintenance/PongResponse.java | 27 ++ 9 files changed, 609 insertions(+), 84 deletions(-) create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/MaintenanceHandshake.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingRequest.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongResponse.java diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java index 6c11c5f251..1f73cac543 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java @@ -76,7 +76,7 @@ import static io.bitsquare.app.BitsquareEnvironment.APP_NAME_KEY; public class BitsquareApp extends Application { private static final Logger log = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BitsquareApp.class); - public static final boolean DEV_MODE = false; + public static final boolean DEV_MODE = true; public static final boolean IS_RELEASE_VERSION = !DEV_MODE && true; private static Environment env; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceHandshake.java new file mode 100644 index 0000000000..b1727bd2e3 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceHandshake.java @@ -0,0 +1,220 @@ +package io.bitsquare.p2p.peers; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.app.Log; +import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.CloseConnectionReason; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.MessageListener; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest; +import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.HashSet; +import java.util.Random; +import java.util.Timer; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class MaintenanceHandshake implements MessageListener { + private static final Logger log = LoggerFactory.getLogger(MaintenanceHandshake.class); + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listener + /////////////////////////////////////////////////////////////////////////////////////////// + + public interface Listener { + void onComplete(); + + void onFault(String errorMessage, @Nullable Connection connection); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Class fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final NetworkNode networkNode; + private final PeerManager peerManager; + private final Listener listener; + private final long nonce = new Random().nextLong(); + private Timer timeoutTimer; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public MaintenanceHandshake(NetworkNode networkNode, PeerManager peerManager, Listener listener) { + this.networkNode = networkNode; + this.peerManager = peerManager; + this.listener = listener; + + networkNode.addMessageListener(this); + } + + public void shutDown() { + networkNode.removeMessageListener(this); + stopTimeoutTimer(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void requestReportedPeers(NodeAddress nodeAddress) { + Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); + checkNotNull(networkNode.getNodeAddress(), "PeerExchangeHandshake.requestReportedPeers: My node address must " + + "not be null at requestReportedPeers"); + GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getConnectedPeers(nodeAddress)); + SettableFuture future = networkNode.sendMessage(nodeAddress, getPeersRequest); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending getPeersRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest + + ".\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + + peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); + shutDown(); + listener.onFault(errorMessage, null); + } + }); + + checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress; + log.info(errorMessage + " / PeerExchangeHandshake=" + + MaintenanceHandshake.this); + + log.info("timeoutTimer called on " + this); + peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); + shutDown(); + listener.onFault(errorMessage, null); + }, + 20, TimeUnit.SECONDS); + } + + public void onGetPeersRequest(GetPeersRequest getPeersRequest, final Connection connection) { + Log.traceCall("getPeersRequest=" + getPeersRequest + "\n\tconnection=" + connection + "\n\tthis=" + this); + + HashSet reportedPeers = getPeersRequest.reportedPeers; + + /* StringBuilder result = new StringBuilder("Received peers:"); + reportedPeers.stream().forEach(e -> result.append("\n\t").append(e)); + log.trace(result.toString());*/ + log.trace("reportedPeers.size=" + reportedPeers.size()); + + checkArgument(connection.getPeersNodeAddressOptional().isPresent(), + "The peers address must have been already set at the moment"); + GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce, + getConnectedPeers(connection.getPeersNodeAddressOptional().get())); + SettableFuture future = networkNode.sendMessage(connection, + getPeersResponse); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersResponse sent successfully"); + shutDown(); + listener.onComplete(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending getPeersRequest to " + connection + + " failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." + + "Exception: " + throwable.getMessage(); + log.info(errorMessage); + + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + shutDown(); + listener.onFault(errorMessage, connection); + } + }); + + checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred at sending getPeersResponse:" + getPeersResponse + " on connection:" + connection; + log.info(errorMessage + " / PeerExchangeHandshake=" + + MaintenanceHandshake.this); + + log.info("timeoutTimer called. this=" + this); + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT); + shutDown(); + listener.onFault(errorMessage, connection); + }, + 20, TimeUnit.SECONDS); + + peerManager.addToReportedPeers(reportedPeers, connection); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + if (message instanceof GetPeersResponse) { + Log.traceCall(message.toString() + "\n\tconnection=" + connection); + Log.traceCall("this=" + this); + GetPeersResponse getPeersResponse = (GetPeersResponse) message; + if (getPeersResponse.requestNonce == nonce) { + stopTimeoutTimer(); + + HashSet reportedPeers = getPeersResponse.reportedPeers; + StringBuilder result = new StringBuilder("Received peers:"); + reportedPeers.stream().forEach(e -> result.append("\n\t").append(e)); + log.trace(result.toString()); + peerManager.addToReportedPeers(reportedPeers, connection); + + shutDown(); + listener.onComplete(); + } else { + log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled handshake " + + "(timeout causes connection close but peer might have sent a msg before connection " + + "was closed).\n\tWe drop that message. nonce={} / requestNonce={}", + nonce, getPeersResponse.requestNonce); + } + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private HashSet getConnectedPeers(NodeAddress receiverNodeAddress) { + return new HashSet<>(peerManager.getConnectedPeers().stream() + .filter(e -> !peerManager.isSeedNode(e) && + !peerManager.isSelf(e) && + !e.nodeAddress.equals(receiverNodeAddress) + ) + .collect(Collectors.toSet())); + } + + private void stopTimeoutTimer() { + if (timeoutTimer != null) { + timeoutTimer.cancel(); + timeoutTimer = null; + } + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java new file mode 100644 index 0000000000..f0d4ebcca5 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java @@ -0,0 +1,286 @@ +package io.bitsquare.p2p.peers; + +import com.google.common.util.concurrent.MoreExecutors; +import io.bitsquare.app.Log; +import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.*; +import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class MaintenanceManager implements MessageListener, ConnectionListener { + private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class); + + private final NetworkNode networkNode; + private final PeerManager peerManager; + private final Set seedNodeAddresses; + private final ScheduledThreadPoolExecutor executor; + private final Map peerExchangeHandshakeMap = new HashMap<>(); + private Timer connectToMorePeersTimer, maintainConnectionsTimer; + private boolean shutDownInProgress; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public MaintenanceManager(NetworkNode networkNode, PeerManager peerManager, Set seedNodeAddresses) { + this.networkNode = networkNode; + this.peerManager = peerManager; + checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty"); + this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); + + executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5); + networkNode.addMessageListener(this); + } + + public void shutDown() { + Log.traceCall(); + shutDownInProgress = true; + + networkNode.removeMessageListener(this); + stopConnectToMorePeersTimer(); + stopMaintainConnectionsTimer(); + peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::closeHandshake); + MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void requestReportedPeersFromSeedNodes(NodeAddress nodeAddress) { + checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers"); + ArrayList remainingNodeAddresses = new ArrayList<>(seedNodeAddresses); + remainingNodeAddresses.remove(nodeAddress); + Collections.shuffle(remainingNodeAddresses); + requestReportedPeers(nodeAddress, remainingNodeAddresses); + + int delay = new Random().nextInt(60) + 60 * 3; // 3-4 min + executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections), + delay, delay, TimeUnit.SECONDS); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + // We use a timer to throttle if we get a series of disconnects + // The more connections we have the more relaxed we are with a checkConnections + stopMaintainConnectionsTimer(); + int size = networkNode.getAllConnections().size(); + int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min) + maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections, + delay, TimeUnit.SECONDS); + } + + @Override + public void onError(Throwable throwable) { + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + if (message instanceof GetPeersRequest) { + Log.traceCall(message.toString() + "\n\tconnection=" + connection); + PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode, + peerManager, + new PeerExchangeHandshake.Listener() { + @Override + public void onComplete() { + log.trace("PeerExchangeHandshake of inbound connection complete.\n\tConnection={}", connection); + } + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + + "connection={}", errorMessage, connection); + peerManager.handleConnectionFault(connection); + } + }); + peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void requestReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) { + Log.traceCall("nodeAddress=" + nodeAddress); + if (!peerExchangeHandshakeMap.containsKey(nodeAddress)) { + PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode, + peerManager, + new PeerExchangeHandshake.Listener() { + @Override + public void onComplete() { + log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress); + peerExchangeHandshakeMap.remove(nodeAddress); + connectToMorePeers(); + } + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + + "nodeAddress={}", errorMessage, nodeAddress); + + peerExchangeHandshakeMap.remove(nodeAddress); + peerManager.handleConnectionFault(nodeAddress, connection); + if (!shutDownInProgress) { + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting peers. " + + "We will try getReportedPeers again."); + requestReportedPeersFromRandomPeer(remainingNodeAddresses); + } else { + log.info("There is no remaining node available for requesting peers. " + + "That is expected if no other node is online.\n\t" + + "We will try again after a random pause."); + if (connectToMorePeersTimer == null) + connectToMorePeersTimer = UserThread.runAfterRandomDelay( + MaintenanceManager.this::connectToMorePeers, 20, 30); + } + } + } + }); + peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake); + peerExchangeHandshake.requestConnectedPeers(nodeAddress); + } else { + //TODO check when that happens + log.warn("We have started already a peerExchangeHandshake. " + + "We ignore that call. " + + "nodeAddress=" + nodeAddress); + } + } + + // we check if we have at least one seed node connected + private void maintainConnections() { + Log.traceCall(); + + stopMaintainConnectionsTimer(); + + // we want at least 1 seed node connected + Set confirmedConnections = networkNode.getConfirmedConnections(); + long numberOfConnectedSeedNodes = confirmedConnections.stream() + .filter(peerManager::isSeedNode) + .count(); + if (numberOfConnectedSeedNodes == 0) { + ArrayList nodeAddresses = new ArrayList<>(seedNodeAddresses); + Collections.shuffle(nodeAddresses); + requestReportedPeersFromRandomPeer(nodeAddresses); + } + + + // We try to get sufficient connections by connecting to reported and persisted peers + if (numberOfConnectedSeedNodes == 0) { + // If we requested a seed node we delay a bit to not have too many requests simultaneously + if (connectToMorePeersTimer == null) + connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10); + } else { + connectToMorePeers(); + } + + // Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive + // Inbound connections should be maintained be the requesting peer + confirmedConnections.stream() + .filter(c -> c.getPeersNodeAddressOptional().isPresent() && + c instanceof OutboundConnection && + new Date().getTime() - c.getLastActivityDate().getTime() > TimeUnit.MINUTES.toMillis(10)) + .forEach(c -> { + log.trace("Call requestReportedPeers on a confirmedConnection by the maintainConnections call"); + requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>()); + }); + } + + private void connectToMorePeers() { + Log.traceCall(); + + stopConnectToMorePeersTimer(); + + if (!peerManager.hasSufficientConnections()) { + // We create a new list of not connected candidates + // 1. reported sorted by most recent lastActivityDate + // 2. persisted sorted by most recent lastActivityDate + // 3. seenNodes + List list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>())); + list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); + ArrayList seedNodeAddresses = new ArrayList<>(this.seedNodeAddresses); + Collections.shuffle(seedNodeAddresses); + list.addAll(seedNodeAddresses.stream() + .filter(e -> !list.contains(e) && + !peerManager.isSelf(e) && + !peerManager.isConfirmed(e)) + .collect(Collectors.toSet())); + log.info("Sorted and filtered list: list.size()=" + list.size()); + log.trace("Sorted and filtered list: list=" + list); + if (!list.isEmpty()) { + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestReportedPeers(nextCandidate, list); + } else { + log.info("No more peers are available for requestReportedPeers."); + } + } else { + log.info("We have already sufficient connections."); + } + } + + // sorted by most recent lastActivityDate + private List getFilteredAndSortedList(Set set, List list) { + return set.stream() + .filter(e -> !list.contains(e.nodeAddress) && + !peerManager.isSeedNode(e) && + !peerManager.isSelf(e) && + !peerManager.isConfirmed(e)) + .collect(Collectors.toList()) + .stream() + .filter(e -> e.lastActivityDate != null) + .sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate)) + .map(e -> e.nodeAddress) + .collect(Collectors.toList()); + } + + private void requestReportedPeersFromRandomPeer(List remainingNodeAddresses) { + NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); + remainingNodeAddresses.remove(nextCandidate); + requestReportedPeers(nextCandidate, remainingNodeAddresses); + } + + private void stopConnectToMorePeersTimer() { + if (connectToMorePeersTimer != null) { + connectToMorePeersTimer.cancel(); + connectToMorePeersTimer = null; + } + } + + private void stopMaintainConnectionsTimer() { + if (maintainConnectionsTimer != null) { + maintainConnectionsTimer.cancel(); + maintainConnectionsTimer = null; + } + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java index d4988ad774..56188573cb 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeHandshake.java @@ -64,7 +64,7 @@ public class PeerExchangeHandshake implements MessageListener { networkNode.addMessageListener(this); } - public void shutDown() { + public void closeHandshake() { networkNode.removeMessageListener(this); stopTimeoutTimer(); } @@ -74,11 +74,11 @@ public class PeerExchangeHandshake implements MessageListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void requestReportedPeers(NodeAddress nodeAddress) { + public void requestConnectedPeers(NodeAddress nodeAddress) { Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); checkNotNull(networkNode.getNodeAddress(), "PeerExchangeHandshake.requestReportedPeers: My node address must " + "not be null at requestReportedPeers"); - GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getReportedPeers(nodeAddress)); + GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getConnectedPeers(nodeAddress)); SettableFuture future = networkNode.sendMessage(nodeAddress, getPeersRequest); Futures.addCallback(future, new FutureCallback() { @Override @@ -94,7 +94,7 @@ public class PeerExchangeHandshake implements MessageListener { log.info(errorMessage); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); - shutDown(); + closeHandshake(); listener.onFault(errorMessage, null); } }); @@ -107,7 +107,7 @@ public class PeerExchangeHandshake implements MessageListener { log.info("timeoutTimer called on " + this); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); - shutDown(); + closeHandshake(); listener.onFault(errorMessage, null); }, 20, TimeUnit.SECONDS); @@ -126,14 +126,14 @@ public class PeerExchangeHandshake implements MessageListener { checkArgument(connection.getPeersNodeAddressOptional().isPresent(), "The peers address must have been already set at the moment"); GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce, - getReportedPeers(connection.getPeersNodeAddressOptional().get())); + getConnectedPeers(connection.getPeersNodeAddressOptional().get())); SettableFuture future = networkNode.sendMessage(connection, getPeersResponse); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.trace("GetPeersResponse sent successfully"); - shutDown(); + closeHandshake(); listener.onComplete(); } @@ -145,7 +145,7 @@ public class PeerExchangeHandshake implements MessageListener { log.info(errorMessage); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); - shutDown(); + closeHandshake(); listener.onFault(errorMessage, connection); } }); @@ -158,7 +158,7 @@ public class PeerExchangeHandshake implements MessageListener { log.info("timeoutTimer called. this=" + this); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT); - shutDown(); + closeHandshake(); listener.onFault(errorMessage, connection); }, 20, TimeUnit.SECONDS); @@ -186,7 +186,7 @@ public class PeerExchangeHandshake implements MessageListener { log.trace(result.toString()); peerManager.addToReportedPeers(reportedPeers, connection); - shutDown(); + closeHandshake(); listener.onComplete(); } else { log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled handshake " + @@ -202,10 +202,9 @@ public class PeerExchangeHandshake implements MessageListener { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private HashSet getReportedPeers(NodeAddress receiverNodeAddress) { - return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream() + private HashSet getConnectedPeers(NodeAddress receiverNodeAddress) { + return new HashSet<>(peerManager.getConnectedPeers().stream() .filter(e -> !peerManager.isSeedNode(e) && - !peerManager.isSelf(e) && !e.nodeAddress.equals(receiverNodeAddress) ) .collect(Collectors.toSet())); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index b1ea0c80dc..ae6c24ce76 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -1,9 +1,7 @@ package io.bitsquare.p2p.peers; -import com.google.common.util.concurrent.MoreExecutors; import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; @@ -13,8 +11,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -26,9 +22,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private final NetworkNode networkNode; private final PeerManager peerManager; private final Set seedNodeAddresses; - private final ScheduledThreadPoolExecutor executor; private final Map peerExchangeHandshakeMap = new HashMap<>(); - private Timer connectToMorePeersTimer, maintainConnectionsTimer; + private Timer connectToMorePeersTimer; private boolean shutDownInProgress; @@ -42,7 +37,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty"); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); - executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5); networkNode.addMessageListener(this); } @@ -52,9 +46,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener networkNode.removeMessageListener(this); stopConnectToMorePeersTimer(); - stopMaintainConnectionsTimer(); - peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::shutDown); - MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); + peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::closeHandshake); } @@ -68,10 +60,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener remainingNodeAddresses.remove(nodeAddress); Collections.shuffle(remainingNodeAddresses); requestReportedPeers(nodeAddress, remainingNodeAddresses); - - int delay = new Random().nextInt(60) + 60 * 3; // 3-4 min - executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections), - delay, delay, TimeUnit.SECONDS); } @@ -85,13 +73,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - // We use a timer to throttle if we get a series of disconnects - // The more connections we have the more relaxed we are with a checkConnections - stopMaintainConnectionsTimer(); - int size = networkNode.getAllConnections().size(); - int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min) - maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections, - delay, TimeUnit.SECONDS); } @Override @@ -168,7 +149,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } }); peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake); - peerExchangeHandshake.requestReportedPeers(nodeAddress); + peerExchangeHandshake.requestConnectedPeers(nodeAddress); } else { //TODO check when that happens log.warn("We have started already a peerExchangeHandshake. " + @@ -177,44 +158,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } } - // we check if we have at least one seed node connected - private void maintainConnections() { - Log.traceCall(); - - stopMaintainConnectionsTimer(); - - // we want at least 1 seed node connected - Set confirmedConnections = networkNode.getConfirmedConnections(); - long numberOfConnectedSeedNodes = confirmedConnections.stream() - .filter(peerManager::isSeedNode) - .count(); - if (numberOfConnectedSeedNodes == 0) { - ArrayList nodeAddresses = new ArrayList<>(seedNodeAddresses); - Collections.shuffle(nodeAddresses); - requestReportedPeersFromRandomPeer(nodeAddresses); - } - - - // We try to get sufficient connections by connecting to reported and persisted peers - if (numberOfConnectedSeedNodes == 0) { - // If we requested a seed node we delay a bit to not have too many requests simultaneously - if (connectToMorePeersTimer == null) - connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10); - } else { - connectToMorePeers(); - } - - // Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive - // Inbound connections should be maintained be the requesting peer - confirmedConnections.stream() - .filter(c -> c.getPeersNodeAddressOptional().isPresent() && - c instanceof OutboundConnection && - new Date().getTime() - c.getLastActivityDate().getTime() > TimeUnit.MINUTES.toMillis(10)) - .forEach(c -> { - log.trace("Call requestReportedPeers on a confirmedConnection by the maintainConnections call"); - requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>()); - }); - } private void connectToMorePeers() { Log.traceCall(); @@ -276,11 +219,4 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener connectToMorePeersTimer = null; } } - - private void stopMaintainConnectionsTimer() { - if (maintainConnectionsTimer != null) { - maintainConnectionsTimer.cancel(); - maintainConnectionsTimer = null; - } - } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 748debd27f..a7a854601e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -425,11 +425,11 @@ public class PeerManager implements ConnectionListener, MessageListener { } } - public Set getConnectedAndReportedPeers() { + /* public Set getConnectedAndReportedPeers() { Set result = new HashSet<>(reportedPeers); result.addAll(getConnectedPeers()); return result; - } + }*/ public boolean isSeedNode(ReportedPeer reportedPeer) { return seedNodeAddresses.contains(reportedPeer.nodeAddress); @@ -501,7 +501,7 @@ public class PeerManager implements ConnectionListener, MessageListener { return list.remove(new Random().nextInt(list.size())); } - private Set getConnectedPeers() { + public Set getConnectedPeers() { // networkNode.getConfirmedConnections includes: // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) return networkNode.getConfirmedConnections().stream() diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java new file mode 100644 index 0000000000..becc675a19 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/MaintenanceMessage.java @@ -0,0 +1,20 @@ +package io.bitsquare.p2p.peers.messages.maintenance; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.Message; + +public abstract class MaintenanceMessage implements Message { + private final int messageVersion = Version.getP2PMessageVersion(); + + @Override + public int getMessageVersion() { + return messageVersion; + } + + @Override + public String toString() { + return "MaintenanceMessage{" + + "messageVersion=" + messageVersion + + '}'; + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingRequest.java new file mode 100644 index 0000000000..afd41055c4 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PingRequest.java @@ -0,0 +1,37 @@ +package io.bitsquare.p2p.peers.messages.maintenance; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; +import io.bitsquare.p2p.peers.ReportedPeer; + +import java.util.HashSet; + +public final class PingRequest extends MaintenanceMessage implements SendersNodeAddressMessage { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; + + private final NodeAddress senderNodeAddress; + public long nonce; + public final HashSet reportedPeers; + + public PingRequest(NodeAddress senderNodeAddress, long nonce, HashSet reportedPeers) { + this.senderNodeAddress = senderNodeAddress; + this.nonce = nonce; + this.reportedPeers = reportedPeers; + } + + @Override + public NodeAddress getSenderNodeAddress() { + return senderNodeAddress; + } + + @Override + public String toString() { + return "GetPeersRequest{" + + "senderNodeAddress=" + senderNodeAddress + + ", nonce=" + nonce + + ", reportedPeers.size()=" + reportedPeers.size() + + "} " + super.toString(); + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongResponse.java new file mode 100644 index 0000000000..a3c134ba00 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/maintenance/PongResponse.java @@ -0,0 +1,27 @@ +package io.bitsquare.p2p.peers.messages.maintenance; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.peers.ReportedPeer; + +import java.util.HashSet; + +public final class PongResponse extends MaintenanceMessage { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; + + public final long requestNonce; + public final HashSet reportedPeers; + + public PongResponse(long requestNonce, HashSet reportedPeers) { + this.requestNonce = requestNonce; + this.reportedPeers = reportedPeers; + } + + @Override + public String toString() { + return "GetPeersResponse{" + + "requestNonce=" + requestNonce + + ", reportedPeers.size()=" + reportedPeers.size() + + "} " + super.toString(); + } +}