From cd5ecbb168226b141a785e63d845012d895cebae Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Wed, 27 Jan 2016 15:08:13 +0100 Subject: [PATCH] Refactor handshake out for data request --- .../io/bitsquare/p2p/network/Connection.java | 7 +- .../p2p/peers/PeerExchangeManager.java | 161 ++++++++------ .../io/bitsquare/p2p/peers/PeerManager.java | 22 +- .../p2p/peers/RequestDataHandshake.java | 183 ++++++++++++++++ .../p2p/peers/RequestDataManager.java | 207 ++++++++---------- .../p2p/peers/messages/data/DataRequest.java | 35 +-- .../p2p/peers/messages/data/DataResponse.java | 5 +- .../messages/data/PreliminaryDataRequest.java | 12 +- .../messages/data/UpdateDataRequest.java | 44 ++++ 9 files changed, 453 insertions(+), 223 deletions(-) create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/data/UpdateDataRequest.java diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 687e288c84..2f6773e651 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -286,7 +286,7 @@ public class Connection implements MessageListener { shutDown(true, null); } - private void shutDown(boolean sendCloseConnectionMessage) { + public void shutDown(boolean sendCloseConnectionMessage) { shutDown(sendCloseConnectionMessage, null); } @@ -557,6 +557,11 @@ public class Connection implements MessageListener { public void stop() { Log.traceCall(); stopped = true; + try { + objectInputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index 8a3e2b396b..d890fb00dc 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -32,7 +32,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private final PeerManager peerManager; private final Set seedNodeAddresses; private final ScheduledThreadPoolExecutor executor; - private Timer continueWithMorePeersTimer, timeoutTimer, maintainConnectionsTimer; + private Timer connectToMorePeersTimer, timeoutTimer, maintainConnectionsTimer; /////////////////////////////////////////////////////////////////////////////////////////// @@ -42,6 +42,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener public PeerExchangeManager(NetworkNode networkNode, PeerManager peerManager, Set seedNodeAddresses) { this.networkNode = networkNode; this.peerManager = peerManager; + checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty"); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5); @@ -52,7 +53,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Log.traceCall(); networkNode.removeMessageListener(this); - stopContinueWithMorePeersTimer(); + stopConnectToMorePeersTimer(); stopMaintainConnectionsTimer(); stopTimeoutTimer(); MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); @@ -64,7 +65,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// public void requestReportedPeers(NodeAddress nodeAddress) { - requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses)); + ArrayList remainingNodeAddresses = new ArrayList<>(seedNodeAddresses); + remainingNodeAddresses.remove(nodeAddress); + requestReportedPeers(nodeAddress, remainingNodeAddresses); long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min. executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections), @@ -106,7 +109,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Log.traceCall(message.toString()); if (message instanceof GetPeersRequest) { HashSet reportedPeers = ((GetPeersRequest) message).reportedPeers; - log.trace("Received reported peers: " + reportedPeers); + + StringBuilder result = new StringBuilder("Received peers:"); + reportedPeers.stream().forEach(e -> result.append("\n").append(e)); + log.trace(result.toString()); + checkArgument(connection.getPeersNodeAddressOptional().isPresent(), "The peers address must have been already set at the moment"); SettableFuture future = networkNode.sendMessage(connection, @@ -126,11 +133,16 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener peerManager.addToReportedPeers(reportedPeers, connection); } else if (message instanceof GetPeersResponse) { stopTimeoutTimer(); + HashSet reportedPeers = ((GetPeersResponse) message).reportedPeers; - log.trace("Received reported peers: " + reportedPeers); + + StringBuilder result = new StringBuilder("Received peers:"); + reportedPeers.stream().forEach(e -> result.append("\n").append(e)); + log.trace(result.toString()); + peerManager.addToReportedPeers(reportedPeers, connection); - continueWithMorePeers(); + connectToMorePeers(); } } } @@ -144,14 +156,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers"); - stopContinueWithMorePeersTimer(); + stopConnectToMorePeersTimer(); stopTimeoutTimer(); timeoutTimer = UserThread.runAfter(() -> { log.info("timeoutTimer called"); handleError(nodeAddress, remainingNodeAddresses); }, - 10, TimeUnit.SECONDS); + 20, TimeUnit.SECONDS); SettableFuture future = networkNode.sendMessage(nodeAddress, new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress))); @@ -171,35 +183,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener }); } - private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { - Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); - - stopTimeoutTimer(); - - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are remaining nodes available for requesting peers. " + - "We will try getReportedPeers again."); - requestReportedPeersFromList(remainingNodeAddresses); - } else { - log.info("There is no remaining node available for requesting peers. " + - "That is expected if no other node is online.\n" + - "We will try to use reported peers (if no available we use persisted peers) " + - "and try again to request peers from our seed nodes after a random pause."); - if (continueWithMorePeersTimer == null) - continueWithMorePeersTimer = UserThread.runAfter(this::continueWithMorePeers, - 30, TimeUnit.SECONDS); - } - } - - private void requestReportedPeersFromList(List remainingNodeAddresses) { - NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nextCandidate); - requestReportedPeers(nextCandidate, remainingNodeAddresses); - } - - private void continueWithMorePeers() { + private void connectToMorePeers() { Log.traceCall(); - stopContinueWithMorePeersTimer(); + + stopConnectToMorePeersTimer(); + if (!peerManager.hasSufficientConnections()) { // We want to keep it sorted but avoid duplicates List list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>())); @@ -222,6 +210,67 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } } + private void handleError(NodeAddress peersNodeAddress, List remainingNodeAddresses) { + Log.traceCall("peersNodeAddress=" + peersNodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + + stopTimeoutTimer(); + + // In case a shutdown was not triggered already by the error we close that connection + // if it is not a DIRECT_MSG_PEER + peerManager.shutDownConnection(peersNodeAddress); + + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting peers. " + + "We will try getReportedPeers again."); + requestReportedPeersFromRandomPeer(remainingNodeAddresses); + } else { + log.info("There is no remaining node available for requesting peers. " + + "That is expected if no other node is online.\n" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request peers from our seed nodes after a random pause."); + if (connectToMorePeersTimer == null) + connectToMorePeersTimer = UserThread.runAfterRandomDelay(this::connectToMorePeers, 20, 30); + } + } + + // we check if we have at least one seed node connected + private void maintainConnections() { + Log.traceCall(); + + stopMaintainConnectionsTimer(); + + // we want at least 1 seed node connected + Set confirmedConnections = networkNode.getConfirmedConnections(); + long numberOfConnectedSeedNodes = confirmedConnections.stream() + .filter(peerManager::isSeedNode) + .count(); + if (numberOfConnectedSeedNodes == 0) + requestReportedPeersFromRandomPeer(new ArrayList<>(seedNodeAddresses)); + + // We try to get sufficient connections by connecting to reported and persisted peers + if (numberOfConnectedSeedNodes == 0) { + // If we requested a seed node we delay a bit to not have too many requests simultaneously + if (connectToMorePeersTimer == null) + connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10); + } else { + connectToMorePeers(); + } + + // Use all outbound connections for updating reported peers and make sure we keep the connection alive + // Inbound connections should be maintained be the requesting peer + confirmedConnections.stream() + .filter(c -> c.getPeersNodeAddressOptional().isPresent() && + c instanceof OutboundConnection). + forEach(c -> UserThread.runAfterRandomDelay(() -> + requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>()) + , 3, 5)); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Utils + /////////////////////////////////////////////////////////////////////////////////////////// + // sorted by most recent lastActivityDate private List getFilteredAndSortedList(Set set, List list) { return set.stream() @@ -236,34 +285,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener .collect(Collectors.toList()); } - - // we check if we have at least one seed node connected - private void maintainConnections() { - Log.traceCall(); - - stopMaintainConnectionsTimer(); - - // we want at least 1 seed node connected - Set allConnections = networkNode.getConfirmedConnections(); - List connectedSeedNodes = allConnections.stream() - .filter(peerManager::isSeedNode) - .collect(Collectors.toList()); - if (connectedSeedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) - requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses)); - - // We try to get sufficient connections by using reported and persisted peers - if (continueWithMorePeersTimer == null) - continueWithMorePeersTimer = UserThread.runAfterRandomDelay(this::continueWithMorePeers, 10, 20); - - - // Use all outbound connections for updating reported peers and make sure we keep the connection alive - // Inbound connections should be maintained be the requesting peer - networkNode.getConfirmedConnections().stream() - .filter(c -> c.getPeersNodeAddressOptional().isPresent() && - c instanceof OutboundConnection). - forEach(c -> UserThread.runAfterRandomDelay(() -> - requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>()) - , 3, 5)); + private void requestReportedPeersFromRandomPeer(List remainingNodeAddresses) { + NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); + remainingNodeAddresses.remove(nextCandidate); + requestReportedPeers(nextCandidate, remainingNodeAddresses); } private HashSet getReportedPeersHashSet(NodeAddress receiverNodeAddress) { @@ -275,10 +300,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener .collect(Collectors.toSet())); } - private void stopContinueWithMorePeersTimer() { - if (continueWithMorePeersTimer != null) { - continueWithMorePeersTimer.cancel(); - continueWithMorePeersTimer = null; + private void stopConnectToMorePeersTimer() { + if (connectToMorePeersTimer != null) { + connectToMorePeersTimer.cancel(); + connectToMorePeersTimer = null; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index ab3a99011f..9a1e76aa7e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -5,7 +5,7 @@ import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.peers.messages.data.DataRequest; +import io.bitsquare.p2p.peers.messages.data.UpdateDataRequest; import io.bitsquare.storage.Storage; import javafx.beans.value.ChangeListener; import org.jetbrains.annotations.Nullable; @@ -135,7 +135,7 @@ public class PeerManager implements ConnectionListener, MessageListener { public void onMessage(Message message, Connection connection) { // In case a seed node connects to another seed node we get his address at the DataRequest triggered from // RequestDataManager.updateDataFromConnectedSeedNode - if (message instanceof DataRequest) { + if (message instanceof UpdateDataRequest) { Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); if (peersNodeAddressOptional.isPresent() && seedNodeAddresses.contains(peersNodeAddressOptional.get())) @@ -251,8 +251,9 @@ public class PeerManager implements ConnectionListener, MessageListener { .collect(Collectors.toMap(e -> e, Function.identity())); HashSet adjustedReportedPeers = new HashSet<>(); reportedPeersToAdd.stream() - .filter(e -> !e.nodeAddress.equals(networkNode.getNodeAddress())) - .filter(e -> !getConnectedPeers().contains(e)) + .filter(e -> e.nodeAddress != null && + !e.nodeAddress.equals(networkNode.getNodeAddress()) && + !getConnectedPeers().contains(e)) .forEach(e -> { if (reportedPeersMap.containsKey(e)) { if (e.lastActivityDate != null && reportedPeersMap.get(e).lastActivityDate != null) { @@ -377,6 +378,19 @@ public class PeerManager implements ConnectionListener, MessageListener { return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress); } + public void shutDownConnection(Connection connection) { + if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) + connection.shutDown(); + } + + public void shutDownConnection(NodeAddress peersNodeAddress) { + networkNode.getAllConnections().stream() + .filter(connection -> connection.getPeersNodeAddressOptional().isPresent() && + connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) && + connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) + .findFirst() + .ifPresent(connection -> connection.shutDown(true)); + } /////////////////////////////////////////////////////////////////////////////////////////// // Private diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java new file mode 100644 index 0000000000..b12f1154eb --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataHandshake.java @@ -0,0 +1,183 @@ +package io.bitsquare.p2p.peers; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.app.Log; +import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.MessageListener; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peers.messages.data.DataRequest; +import io.bitsquare.p2p.peers.messages.data.DataResponse; +import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest; +import io.bitsquare.p2p.peers.messages.data.UpdateDataRequest; +import io.bitsquare.p2p.storage.P2PDataStorage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Random; +import java.util.Timer; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; + +public class RequestDataHandshake implements MessageListener { + private static final Logger log = LoggerFactory.getLogger(RequestDataHandshake.class); + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listener + /////////////////////////////////////////////////////////////////////////////////////////// + + public interface Listener { + void onComplete(); + + void onFault(String errorMessage); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Class fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final NetworkNode networkNode; + private final P2PDataStorage dataStorage; + private final PeerManager peerManager; + private final Listener listener; + private Timer timeoutTimer; + private long requestNonce; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public RequestDataHandshake(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, + Listener listener) { + this.networkNode = networkNode; + this.dataStorage = dataStorage; + this.peerManager = peerManager; + this.listener = listener; + + networkNode.addMessageListener(this); + } + + public void shutDown() { + Log.traceCall(); + + networkNode.removeMessageListener(this); + + stopTimeoutTimer(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void requestData(NodeAddress nodeAddress) { + Log.traceCall("nodeAddress=" + nodeAddress); + + stopTimeoutTimer(); + + checkArgument(timeoutTimer == null, "requestData must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + log.info("timeoutTimer called"); + peerManager.shutDownConnection(nodeAddress); + shutDown(); + listener.onFault("A timeout occurred"); + }, + 10, TimeUnit.SECONDS); + + Message dataRequest; + requestNonce = new Random().nextLong(); + if (networkNode.getNodeAddress() == null) + dataRequest = new PreliminaryDataRequest(requestNonce); + else + dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), requestNonce); + + log.info("We send a {} to peer {}. ", dataRequest.getClass().getSimpleName(), nodeAddress); + + SettableFuture future = networkNode.sendMessage(nodeAddress, dataRequest); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.trace("Send DataRequest to " + nodeAddress + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending " + dataRequest.getClass().getSimpleName() + " to " + nodeAddress + + " failed. That is expected if the peer is offline. " + + "Exception:" + throwable.getMessage(); + log.info(errorMessage); + + peerManager.shutDownConnection(nodeAddress); + shutDown(); + listener.onFault(errorMessage); + } + }); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + if (message instanceof DataRequest) { + Log.traceCall(message.toString()); + DataRequest dataRequest = (DataRequest) message; + DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce()); + SettableFuture future = networkNode.sendMessage(connection, dataResponse); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Send DataResponse to {} succeeded. dataResponse={}", + connection.getPeersNodeAddressOptional(), dataResponse); + shutDown(); + listener.onComplete(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Send DataResponse to " + connection.getPeersNodeAddressOptional() + " failed. " + + "That is expected if the peer went offline. " + + "Exception:" + throwable.getMessage(); + log.info(errorMessage); + + peerManager.shutDownConnection(connection); + shutDown(); + listener.onFault(errorMessage); + } + }); + } else if (message instanceof DataResponse) { + DataResponse dataResponse = (DataResponse) message; + if (dataResponse.requestNonce == requestNonce) { + Log.traceCall(message.toString()); + + stopTimeoutTimer(); + + // connection.getPeersNodeAddressOptional() is not present at the first call + log.debug("connection.getPeersNodeAddressOptional() " + connection.getPeersNodeAddressOptional()); + connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { + ((DataResponse) message).dataSet.stream() + .forEach(e -> dataStorage.add(e, peersNodeAddress)); + }); + shutDown(); + listener.onComplete(); + } + } + } + + private void stopTimeoutTimer() { + if (timeoutTimer != null) { + timeoutTimer.cancel(); + timeoutTimer = null; + } + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index eb3f18b4c3..86203a2826 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -1,8 +1,5 @@ package io.bitsquare.p2p.peers; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Message; @@ -11,11 +8,7 @@ import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.peers.messages.data.DataRequest; -import io.bitsquare.p2p.peers.messages.data.DataResponse; -import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest; import io.bitsquare.p2p.storage.P2PDataStorage; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +48,10 @@ public class RequestDataManager implements MessageListener { private final Collection seedNodeAddresses; private final Listener listener; + private final Map requestDataHandshakeMap = new HashMap<>(); + private Optional nodeOfPreliminaryDataRequest = Optional.empty(); - private Timer requestDataAfterDelayTimer, timeoutTimer; + private Timer requestDataTimer; private boolean dataUpdateRequested; @@ -69,6 +64,7 @@ public class RequestDataManager implements MessageListener { this.networkNode = networkNode; this.dataStorage = dataStorage; this.peerManager = peerManager; + checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty."); this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); this.listener = listener; @@ -78,10 +74,11 @@ public class RequestDataManager implements MessageListener { public void shutDown() { Log.traceCall(); + stopRequestDataTimer(); + networkNode.removeMessageListener(this); - stopRequestDataTimer(); - stopTimeoutTimer(); + requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown); } @@ -91,8 +88,7 @@ public class RequestDataManager implements MessageListener { public void requestPreliminaryData() { Log.traceCall(); - checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty."); - requestDataFromList(new ArrayList<>(seedNodeAddresses)); + requestDataFromRandomPeer(new ArrayList<>(seedNodeAddresses)); } public void requestUpdatesData() { @@ -116,30 +112,24 @@ public class RequestDataManager implements MessageListener { @Override public void onMessage(Message message, Connection connection) { - if (message instanceof PreliminaryDataRequest || message instanceof DataRequest) { + if (message instanceof DataRequest) { Log.traceCall(message.toString()); - networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values()))); - } else if (message instanceof DataResponse) { - Log.traceCall(message.toString()); - stopTimeoutTimer(); - connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { - ((DataResponse) message).dataSet.stream() - .forEach(e -> dataStorage.add(e, peersNodeAddress)); + log.trace("Received {} at {}", message.getClass().getSimpleName(), connection); + RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager, + new RequestDataHandshake.Listener() { + @Override + public void onComplete() { + log.trace("RequestDataHandshake of inbound connection complete. Connection= {}", + connection); + } - // 1. We get a response from requestPreliminaryData - if (!nodeOfPreliminaryDataRequest.isPresent()) { - nodeOfPreliminaryDataRequest = Optional.of(peersNodeAddress); - listener.onPreliminaryDataReceived(); - } - - // 2. Later we get a response from requestUpdatesData - if (dataUpdateRequested) { - dataUpdateRequested = false; - listener.onUpdatedDataReceived(); - } - - listener.onDataReceived(); - }); + @Override + public void onFault(String errorMessage) { + log.info("RequestDataHandshake of inbound connection failed. Connection= {}", + errorMessage, connection); + } + }); + requestDataHandshake.onMessage(message, connection); } } @@ -148,7 +138,7 @@ public class RequestDataManager implements MessageListener { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void requestDataFromList(List nodeAddresses) { + private void requestDataFromRandomPeer(List nodeAddresses) { Log.traceCall("remainingNodeAddresses=" + nodeAddresses); NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); nodeAddresses.remove(nextCandidate); @@ -157,83 +147,79 @@ public class RequestDataManager implements MessageListener { private void requestData(NodeAddress nodeAddress, List remainingNodeAddresses) { Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); - log.info("We try to send a DataRequest request to peer. " + nodeAddress); - stopTimeoutTimer(); - stopRequestDataTimer(); + if (!requestDataHandshakeMap.containsKey(nodeAddress)) { + RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager, + new RequestDataHandshake.Listener() { + @Override + public void onComplete() { + stopRequestDataTimer(); - timeoutTimer = UserThread.runAfter(() -> { - log.info("timeoutTimer called"); - handleError(nodeAddress, remainingNodeAddresses); - }, - 10, TimeUnit.SECONDS); + // need to remove before listeners are notified as they cause the update call + requestDataHandshakeMap.remove(nodeAddress); - Message dataRequest; - if (networkNode.getNodeAddress() == null) - dataRequest = new PreliminaryDataRequest(); - else - dataRequest = new DataRequest(networkNode.getNodeAddress()); + // 1. We get a response from requestPreliminaryData + if (!nodeOfPreliminaryDataRequest.isPresent()) { + nodeOfPreliminaryDataRequest = Optional.of(nodeAddress); + listener.onPreliminaryDataReceived(); + } - SettableFuture future = networkNode.sendMessage(nodeAddress, dataRequest); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.trace("Send DataRequest to " + nodeAddress + " succeeded."); - } + // 2. Later we get a response from requestUpdatesData + if (dataUpdateRequested) { + dataUpdateRequested = false; + listener.onUpdatedDataReceived(); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Send DataRequest to " + nodeAddress + " failed. " + - "That is expected if the peer is offline. " + - "Exception:" + throwable.getMessage()); + listener.onDataReceived(); + } - handleError(nodeAddress, remainingNodeAddresses); - } - }); - } - - private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { - Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); - stopTimeoutTimer(); - - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are remaining nodes available for requesting data. " + - "We will try requestDataFromPeers again."); - requestDataFromList(remainingNodeAddresses); - } else { - log.info("There is no remaining node available for requesting data. " + - "That is expected if no other node is online.\n" + - "We will try to use reported peers (if no available we use persisted peers) " + - "and try again to request data from our seed nodes after a random pause."); - - if (peerManager.isSeedNode(nodeAddress)) - listener.onNoSeedNodeAvailable(); - else - listener.onNoPeersAvailable(); - - requestDataAfterDelayTimer = UserThread.runAfterRandomDelay(() -> { - log.trace("requestDataAfterDelayTimer called"); - if (!seedNodeAddresses.isEmpty()) { - Set nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections(); - // We want to keep it sorted but avoid duplicates - // We don't filter out already established connections for seed nodes as it might be that - // we got from the other seed node contacted but we still have not requested the initial - // data set - List list = new ArrayList<>(seedNodeAddresses); - list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list)); - list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); - log.trace("Sorted and filtered list: list=" + list); - if (!list.isEmpty()) { - NodeAddress nextCandidate = list.get(0); - list.remove(nextCandidate); - requestData(nextCandidate, list); + @Override + public void onFault(String errorMessage) { + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting data. " + + "We will try requestDataFromPeers again."); + requestDataFromRandomPeer(remainingNodeAddresses); } else { - log.info("Neither seed nodes, reported peers nor persisted peers are available. " + - "At least seed nodes should be always available."); + log.info("There is no remaining node available for requesting data. " + + "That is expected if no other node is online.\n" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request data from our seed nodes after a random pause."); + + // try again after a pause + stopRequestDataTimer(); + requestDataTimer = UserThread.runAfterRandomDelay(() -> { + log.trace("requestDataAfterDelayTimer called"); + // We want to keep it sorted but avoid duplicates + // We don't filter out already established connections for seed nodes as it might be that + // we got from the other seed node contacted but we still have not requested the initial + // data set + List list = new ArrayList<>(seedNodeAddresses); + list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list)); + list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); + log.trace("Sorted and filtered list: list=" + list); + checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty."); + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestData(nextCandidate, list); + }, + 10, 15, TimeUnit.SECONDS); + } + + requestDataHandshakeMap.remove(nodeAddress); + + // Notify listeners + if (!nodeOfPreliminaryDataRequest.isPresent()) { + if (peerManager.isSeedNode(nodeAddress)) + listener.onNoSeedNodeAvailable(); + else + listener.onNoPeersAvailable(); } } - }, - 10, 15, TimeUnit.SECONDS); + }); + requestDataHandshakeMap.put(nodeAddress, requestDataHandshake); + requestDataHandshake.requestData(nodeAddress); + } else { + log.warn("We have started already a DataRequest request to peer. " + nodeAddress); } } @@ -242,7 +228,7 @@ public class RequestDataManager implements MessageListener { return set.stream() .filter(e -> !list.contains(e.nodeAddress) && !peerManager.isSeedNode(e) && - !peerManager.isSelf(e.nodeAddress)) + !peerManager.isSelf(e)) .collect(Collectors.toList()) .stream() .sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate)) @@ -251,16 +237,9 @@ public class RequestDataManager implements MessageListener { } private void stopRequestDataTimer() { - if (requestDataAfterDelayTimer != null) { - requestDataAfterDelayTimer.cancel(); - requestDataAfterDelayTimer = null; - } - } - - private void stopTimeoutTimer() { - if (timeoutTimer != null) { - timeoutTimer.cancel(); - timeoutTimer = null; + if (requestDataTimer != null) { + requestDataTimer.cancel(); + requestDataTimer = null; } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java index 10fe81af53..4c25ae3949 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java @@ -1,36 +1,5 @@ package io.bitsquare.p2p.peers.messages.data; -import io.bitsquare.app.Version; -import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; - -public final class DataRequest implements SendersNodeAddressMessage { - // That object is sent over the wire, so we need to take care of version compatibility. - private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - - private final int networkId = Version.getNetworkId(); - private final NodeAddress senderNodeAddress; - - public DataRequest(NodeAddress senderNodeAddress) { - this.senderNodeAddress = senderNodeAddress; - } - - @Override - public NodeAddress getSenderNodeAddress() { - return senderNodeAddress; - } - - @Override - public int networkId() { - return networkId; - } - - @Override - public String toString() { - return "DataRequest{" + - "senderNodeAddress=" + senderNodeAddress + - ", networkId=" + networkId + - '}'; - } - +public interface DataRequest { + long getNonce(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java index 5f04213cc1..63c3877c4c 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java @@ -12,9 +12,11 @@ public final class DataResponse implements Message { private final int networkId = Version.getNetworkId(); public final HashSet dataSet; + public final long requestNonce; - public DataResponse(HashSet dataSet) { + public DataResponse(HashSet dataSet, long requestNonce) { this.dataSet = dataSet; + this.requestNonce = requestNonce; } @Override @@ -43,6 +45,7 @@ public final class DataResponse implements Message { return "DataResponse{" + "networkId=" + networkId + ", dataSet=" + dataSet + + ", requestNonce=" + requestNonce + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java index abaa5f5106..4841eb48e3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java @@ -3,13 +3,20 @@ package io.bitsquare.p2p.peers.messages.data; import io.bitsquare.app.Version; import io.bitsquare.p2p.network.messages.AnonymousMessage; -public final class PreliminaryDataRequest implements AnonymousMessage { +public final class PreliminaryDataRequest implements AnonymousMessage, DataRequest { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.getNetworkId(); + private final long nonce; - public PreliminaryDataRequest() { + public PreliminaryDataRequest(long nonce) { + this.nonce = nonce; + } + + @Override + public long getNonce() { + return nonce; } @Override @@ -21,6 +28,7 @@ public final class PreliminaryDataRequest implements AnonymousMessage { public String toString() { return "PreliminaryDataRequest{" + "networkId=" + networkId + + ", nonce=" + nonce + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/UpdateDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/UpdateDataRequest.java new file mode 100644 index 0000000000..5365ee8122 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/UpdateDataRequest.java @@ -0,0 +1,44 @@ +package io.bitsquare.p2p.peers.messages.data; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; + +public final class UpdateDataRequest implements SendersNodeAddressMessage, DataRequest { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + + private final int networkId = Version.getNetworkId(); + private final NodeAddress senderNodeAddress; + private final long nonce; + + public UpdateDataRequest(NodeAddress senderNodeAddress, long nonce) { + this.senderNodeAddress = senderNodeAddress; + this.nonce = nonce; + } + + @Override + public long getNonce() { + return nonce; + } + + @Override + public NodeAddress getSenderNodeAddress() { + return senderNodeAddress; + } + + @Override + public int networkId() { + return networkId; + } + + @Override + public String toString() { + return "DataRequest{" + + "senderNodeAddress=" + senderNodeAddress + + ", networkId=" + networkId + + ", nonce=" + nonce + + '}'; + } + +}