mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-06-25 07:10:48 -04:00
Refactor handshake out for peers request
This commit is contained in:
parent
cd5ecbb168
commit
a856fa36d3
10 changed files with 405 additions and 236 deletions
|
@ -134,13 +134,15 @@ public class OfferAvailabilityProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startTimeout() {
|
private void startTimeout() {
|
||||||
stopTimeout();
|
if (timeoutTimer == null) {
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
timeoutTimer = UserThread.runAfter(() -> {
|
log.warn("Timeout reached at " + this);
|
||||||
log.warn("Timeout reached");
|
model.offer.setState(Offer.State.OFFERER_OFFLINE);
|
||||||
model.offer.setState(Offer.State.OFFERER_OFFLINE);
|
errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
|
||||||
errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
|
}, TIMEOUT_SEC);
|
||||||
}, TIMEOUT_SEC);
|
} else {
|
||||||
|
log.warn("timeoutTimer already created. That must not happen.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopTimeout() {
|
private void stopTimeout() {
|
||||||
|
|
|
@ -38,8 +38,7 @@
|
||||||
|
|
||||||
<Label fx:id="bitcoinPeersLabel" text="Connected peers:" GridPane.rowIndex="1"/>
|
<Label fx:id="bitcoinPeersLabel" text="Connected peers:" GridPane.rowIndex="1"/>
|
||||||
<TextArea fx:id="bitcoinPeersTextArea" GridPane.rowIndex="1" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
|
<TextArea fx:id="bitcoinPeersTextArea" GridPane.rowIndex="1" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
|
||||||
editable="false" focusTraversable="false"/>
|
GridPane.vgrow="ALWAYS" editable="false" focusTraversable="false"/>
|
||||||
|
|
||||||
|
|
||||||
<TitledGroupBg text="P2P network" GridPane.rowIndex="3" GridPane.rowSpan="5">
|
<TitledGroupBg text="P2P network" GridPane.rowIndex="3" GridPane.rowSpan="5">
|
||||||
<padding>
|
<padding>
|
||||||
|
@ -64,7 +63,7 @@
|
||||||
|
|
||||||
<Label fx:id="p2PPeersLabel" text="Connected peers:" GridPane.rowIndex="4"/>
|
<Label fx:id="p2PPeersLabel" text="Connected peers:" GridPane.rowIndex="4"/>
|
||||||
<TextArea fx:id="p2PPeersTextArea" GridPane.rowIndex="4" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
|
<TextArea fx:id="p2PPeersTextArea" GridPane.rowIndex="4" GridPane.columnIndex="1" GridPane.hgrow="ALWAYS"
|
||||||
editable="false" focusTraversable="false"/>
|
GridPane.vgrow="ALWAYS" editable="false" focusTraversable="false"/>
|
||||||
|
|
||||||
<columnConstraints>
|
<columnConstraints>
|
||||||
<ColumnConstraints hgrow="SOMETIMES" halignment="RIGHT" minWidth="200.0"/>
|
<ColumnConstraints hgrow="SOMETIMES" halignment="RIGHT" minWidth="200.0"/>
|
||||||
|
|
|
@ -110,12 +110,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
|
|
||||||
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
|
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
|
||||||
Set<NodeAddress> nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections();
|
UserThread.execute(() -> numConnectedPeers.set(networkNode.getNodeAddressesOfConfirmedConnections().size()));
|
||||||
Set<Connection> allConfirmedConnections = networkNode.getConfirmedConnections();
|
|
||||||
log.info("nodeAddressesOfConfirmedConnections=" + nodeAddressesOfConfirmedConnections);
|
|
||||||
log.info("allConfirmedConnections=" + allConfirmedConnections);
|
|
||||||
log.info("Nr of connections: {} / {} (nodeAddressesOfConfirmedConnections / allConfirmedConnections)", nodeAddressesOfConfirmedConnections.size(), allConfirmedConnections.size());
|
|
||||||
UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfConfirmedConnections.size()));
|
|
||||||
};
|
};
|
||||||
|
|
||||||
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
|
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
|
||||||
|
@ -265,7 +260,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
||||||
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
|
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
|
||||||
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
|
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
|
||||||
"seedNodeOfPreliminaryDataRequest must be present");
|
"seedNodeOfPreliminaryDataRequest must be present");
|
||||||
peerExchangeManager.requestReportedPeers(seedNodeOfPreliminaryDataRequest.get());
|
peerExchangeManager.requestReportedPeersFromSeedNodes(seedNodeOfPreliminaryDataRequest.get());
|
||||||
|
|
||||||
isBootstrapped = true;
|
isBootstrapped = true;
|
||||||
p2pServiceListeners.stream().forEach(P2PServiceListener::onBootstrapComplete);
|
p2pServiceListeners.stream().forEach(P2PServiceListener::onBootstrapComplete);
|
||||||
|
|
|
@ -0,0 +1,208 @@
|
||||||
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import io.bitsquare.app.Log;
|
||||||
|
import io.bitsquare.common.UserThread;
|
||||||
|
import io.bitsquare.p2p.Message;
|
||||||
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
|
import io.bitsquare.p2p.network.Connection;
|
||||||
|
import io.bitsquare.p2p.network.MessageListener;
|
||||||
|
import io.bitsquare.p2p.network.NetworkNode;
|
||||||
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
|
||||||
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
public class PeerExchangeHandshake implements MessageListener {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandshake.class);
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Listener
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public interface Listener {
|
||||||
|
void onComplete();
|
||||||
|
|
||||||
|
void onFault(String errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Class fields
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private final NetworkNode networkNode;
|
||||||
|
private final PeerManager peerManager;
|
||||||
|
private final Listener listener;
|
||||||
|
private final long nonce = new Random().nextLong();
|
||||||
|
private Timer timeoutTimer;
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Constructor
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public PeerExchangeHandshake(NetworkNode networkNode, PeerManager peerManager, Listener listener) {
|
||||||
|
this.networkNode = networkNode;
|
||||||
|
this.peerManager = peerManager;
|
||||||
|
this.listener = listener;
|
||||||
|
|
||||||
|
networkNode.addMessageListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutDown() {
|
||||||
|
Log.traceCall();
|
||||||
|
networkNode.removeMessageListener(this);
|
||||||
|
stopTimeoutTimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// API
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
||||||
|
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
||||||
|
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
|
||||||
|
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
|
||||||
|
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
|
log.info("timeoutTimer called");
|
||||||
|
peerManager.shutDownConnection(nodeAddress);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault("A timeout occurred");
|
||||||
|
},
|
||||||
|
20, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce,
|
||||||
|
getReportedPeers(nodeAddress));
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
|
||||||
|
getPeersRequest);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
|
||||||
|
" failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." +
|
||||||
|
"Exception: " + throwable.getMessage();
|
||||||
|
log.info(errorMessage);
|
||||||
|
|
||||||
|
peerManager.shutDownConnection(nodeAddress);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onGetPeersRequest(GetPeersRequest message, final Connection connection) {
|
||||||
|
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
|
||||||
|
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
|
log.info("timeoutTimer called");
|
||||||
|
peerManager.shutDownConnection(connection);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault("A timeout occurred");
|
||||||
|
},
|
||||||
|
20, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
GetPeersRequest getPeersRequest = message;
|
||||||
|
HashSet<ReportedPeer> reportedPeers = getPeersRequest.reportedPeers;
|
||||||
|
StringBuilder result = new StringBuilder("Received peers:");
|
||||||
|
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
|
||||||
|
log.trace(result.toString());
|
||||||
|
|
||||||
|
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
||||||
|
"The peers address must have been already set at the moment");
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
||||||
|
new GetPeersResponse(getPeersRequest.nonce,
|
||||||
|
getReportedPeers(connection.getPeersNodeAddressOptional().get())));
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("GetPeersResponse sent successfully");
|
||||||
|
shutDown();
|
||||||
|
listener.onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
String errorMessage = "Sending getPeersRequest to " + connection +
|
||||||
|
" failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." +
|
||||||
|
"Exception: " + throwable.getMessage();
|
||||||
|
log.info(errorMessage);
|
||||||
|
|
||||||
|
peerManager.shutDownConnection(connection);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
peerManager.addToReportedPeers(reportedPeers, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// MessageListener implementation
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, Connection connection) {
|
||||||
|
if (message instanceof GetPeersResponse) {
|
||||||
|
Log.traceCall(message.toString() + " / connection=" + connection);
|
||||||
|
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
|
||||||
|
if (getPeersResponse.requestNonce == nonce) {
|
||||||
|
stopTimeoutTimer();
|
||||||
|
|
||||||
|
HashSet<ReportedPeer> reportedPeers = getPeersResponse.reportedPeers;
|
||||||
|
StringBuilder result = new StringBuilder("Received peers:");
|
||||||
|
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
|
||||||
|
log.trace(result.toString());
|
||||||
|
peerManager.addToReportedPeers(reportedPeers, connection);
|
||||||
|
|
||||||
|
shutDown();
|
||||||
|
listener.onComplete();
|
||||||
|
} else {
|
||||||
|
log.debug("Nonce not matching. That happens if we get a response after a canceled handshake " +
|
||||||
|
"(timeout). We drop that message. nonce={} / requestNonce={}",
|
||||||
|
nonce, getPeersResponse.requestNonce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private HashSet<ReportedPeer> getReportedPeers(NodeAddress receiverNodeAddress) {
|
||||||
|
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
|
||||||
|
.filter(e -> !peerManager.isSeedNode(e) &&
|
||||||
|
!peerManager.isSelf(e) &&
|
||||||
|
!e.nodeAddress.equals(receiverNodeAddress)
|
||||||
|
)
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopTimeoutTimer() {
|
||||||
|
if (timeoutTimer != null) {
|
||||||
|
timeoutTimer.cancel();
|
||||||
|
timeoutTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,6 @@
|
||||||
package io.bitsquare.p2p.peers;
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
|
||||||
import io.bitsquare.app.Log;
|
import io.bitsquare.app.Log;
|
||||||
import io.bitsquare.common.UserThread;
|
import io.bitsquare.common.UserThread;
|
||||||
import io.bitsquare.common.util.Utilities;
|
import io.bitsquare.common.util.Utilities;
|
||||||
|
@ -11,9 +8,6 @@ import io.bitsquare.p2p.Message;
|
||||||
import io.bitsquare.p2p.NodeAddress;
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
import io.bitsquare.p2p.network.*;
|
import io.bitsquare.p2p.network.*;
|
||||||
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
|
||||||
import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
|
|
||||||
import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -32,7 +26,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
private final PeerManager peerManager;
|
private final PeerManager peerManager;
|
||||||
private final Set<NodeAddress> seedNodeAddresses;
|
private final Set<NodeAddress> seedNodeAddresses;
|
||||||
private final ScheduledThreadPoolExecutor executor;
|
private final ScheduledThreadPoolExecutor executor;
|
||||||
private Timer connectToMorePeersTimer, timeoutTimer, maintainConnectionsTimer;
|
private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>();
|
||||||
|
private Timer connectToMorePeersTimer, maintainConnectionsTimer;
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -55,7 +50,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
networkNode.removeMessageListener(this);
|
networkNode.removeMessageListener(this);
|
||||||
stopConnectToMorePeersTimer();
|
stopConnectToMorePeersTimer();
|
||||||
stopMaintainConnectionsTimer();
|
stopMaintainConnectionsTimer();
|
||||||
stopTimeoutTimer();
|
peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::shutDown);
|
||||||
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
|
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,12 +59,13 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public void requestReportedPeers(NodeAddress nodeAddress) {
|
public void requestReportedPeersFromSeedNodes(NodeAddress nodeAddress) {
|
||||||
|
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
|
||||||
ArrayList<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
ArrayList<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||||
remainingNodeAddresses.remove(nodeAddress);
|
remainingNodeAddresses.remove(nodeAddress);
|
||||||
requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
||||||
|
|
||||||
long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
|
int delay = new Random().nextInt(60) + 60 * 4; // 4-5 min
|
||||||
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
|
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
|
||||||
delay, delay, TimeUnit.SECONDS);
|
delay, delay, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
@ -87,11 +83,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
public void onDisconnect(Reason reason, Connection connection) {
|
public void onDisconnect(Reason reason, Connection connection) {
|
||||||
// We use a timer to throttle if we get a series of disconnects
|
// We use a timer to throttle if we get a series of disconnects
|
||||||
// The more connections we have the more relaxed we are with a checkConnections
|
// The more connections we have the more relaxed we are with a checkConnections
|
||||||
if (maintainConnectionsTimer == null)
|
stopMaintainConnectionsTimer();
|
||||||
maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
|
int size = networkNode.getAllConnections().size();
|
||||||
networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS);
|
int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min)
|
||||||
|
maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
|
||||||
|
delay, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -105,45 +101,23 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, Connection connection) {
|
public void onMessage(Message message, Connection connection) {
|
||||||
if (message instanceof PeerExchangeMessage) {
|
if (message instanceof GetPeersRequest) {
|
||||||
Log.traceCall(message.toString());
|
PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
|
||||||
if (message instanceof GetPeersRequest) {
|
peerManager,
|
||||||
HashSet<ReportedPeer> reportedPeers = ((GetPeersRequest) message).reportedPeers;
|
new PeerExchangeHandshake.Listener() {
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
log.trace("PeerExchangeHandshake of inbound connection complete. Connection= {}",
|
||||||
|
connection);
|
||||||
|
}
|
||||||
|
|
||||||
StringBuilder result = new StringBuilder("Received peers:");
|
@Override
|
||||||
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
|
public void onFault(String errorMessage) {
|
||||||
log.trace(result.toString());
|
log.trace("PeerExchangeHandshake of outbound connection failed. {} connection= {}",
|
||||||
|
errorMessage, connection);
|
||||||
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
}
|
||||||
"The peers address must have been already set at the moment");
|
});
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection);
|
||||||
new GetPeersResponse(getReportedPeersHashSet(connection.getPeersNodeAddressOptional().get())));
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.trace("GetPeersResponse sent successfully");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
log.info("GetPeersResponse sending failed " + throwable.getMessage() +
|
|
||||||
" Maybe the peer went offline.");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
peerManager.addToReportedPeers(reportedPeers, connection);
|
|
||||||
} else if (message instanceof GetPeersResponse) {
|
|
||||||
stopTimeoutTimer();
|
|
||||||
|
|
||||||
HashSet<ReportedPeer> reportedPeers = ((GetPeersResponse) message).reportedPeers;
|
|
||||||
|
|
||||||
StringBuilder result = new StringBuilder("Received peers:");
|
|
||||||
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
|
|
||||||
log.trace(result.toString());
|
|
||||||
|
|
||||||
peerManager.addToReportedPeers(reportedPeers, connection);
|
|
||||||
|
|
||||||
connectToMorePeers();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,82 +128,44 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
|
|
||||||
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
||||||
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
||||||
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
|
if (!peerExchangeHandshakeMap.containsKey(nodeAddress)) {
|
||||||
|
PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
|
||||||
|
peerManager,
|
||||||
|
new PeerExchangeHandshake.Listener() {
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress= {}",
|
||||||
|
nodeAddress);
|
||||||
|
peerExchangeHandshakeMap.remove(nodeAddress);
|
||||||
|
connectToMorePeers();
|
||||||
|
}
|
||||||
|
|
||||||
stopConnectToMorePeersTimer();
|
@Override
|
||||||
stopTimeoutTimer();
|
public void onFault(String errorMessage) {
|
||||||
|
log.trace("PeerExchangeHandshake of outbound connection failed. {} nodeAddress= {}",
|
||||||
|
errorMessage, nodeAddress);
|
||||||
|
|
||||||
timeoutTimer = UserThread.runAfter(() -> {
|
peerExchangeHandshakeMap.remove(nodeAddress);
|
||||||
log.info("timeoutTimer called");
|
if (!remainingNodeAddresses.isEmpty()) {
|
||||||
handleError(nodeAddress, remainingNodeAddresses);
|
log.info("There are remaining nodes available for requesting peers. " +
|
||||||
},
|
"We will try getReportedPeers again.");
|
||||||
20, TimeUnit.SECONDS);
|
requestReportedPeersFromRandomPeer(remainingNodeAddresses);
|
||||||
|
} else {
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
|
log.info("There is no remaining node available for requesting peers. " +
|
||||||
new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
|
"That is expected if no other node is online.\n" +
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
"We will try again after a random pause.");
|
||||||
@Override
|
if (connectToMorePeersTimer == null)
|
||||||
public void onSuccess(Connection connection) {
|
connectToMorePeersTimer = UserThread.runAfterRandomDelay(
|
||||||
log.trace("GetPeersRequest sent successfully");
|
PeerExchangeManager.this::connectToMorePeers, 20, 30);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
@Override
|
});
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake);
|
||||||
log.info("Sending GetPeersRequest to " + nodeAddress + " failed. " +
|
peerExchangeHandshake.requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
||||||
"That is expected if the peer is offline. " +
|
|
||||||
"Exception:" + throwable.getMessage());
|
|
||||||
handleError(nodeAddress, remainingNodeAddresses);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void connectToMorePeers() {
|
|
||||||
Log.traceCall();
|
|
||||||
|
|
||||||
stopConnectToMorePeersTimer();
|
|
||||||
|
|
||||||
if (!peerManager.hasSufficientConnections()) {
|
|
||||||
// We want to keep it sorted but avoid duplicates
|
|
||||||
List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
|
|
||||||
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
|
|
||||||
list.addAll(seedNodeAddresses.stream()
|
|
||||||
.filter(e -> !list.contains(e) &&
|
|
||||||
!peerManager.isSelf(e) &&
|
|
||||||
!peerManager.isConfirmed(e))
|
|
||||||
.collect(Collectors.toSet()));
|
|
||||||
log.trace("Sorted and filtered list: list=" + list);
|
|
||||||
if (!list.isEmpty()) {
|
|
||||||
NodeAddress nextCandidate = list.get(0);
|
|
||||||
list.remove(nextCandidate);
|
|
||||||
requestReportedPeers(nextCandidate, list);
|
|
||||||
} else {
|
|
||||||
log.info("No more peers are available for requestReportedPeers.");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.info("We have already sufficient connections.");
|
log.trace("We have started already a peerExchangeHandshake to peer. " +
|
||||||
}
|
"That can happen by the timers calls. We ignore that call. " +
|
||||||
}
|
"nodeAddress=" + nodeAddress);
|
||||||
|
|
||||||
private void handleError(NodeAddress peersNodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
|
||||||
Log.traceCall("peersNodeAddress=" + peersNodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
|
||||||
|
|
||||||
stopTimeoutTimer();
|
|
||||||
|
|
||||||
// In case a shutdown was not triggered already by the error we close that connection
|
|
||||||
// if it is not a DIRECT_MSG_PEER
|
|
||||||
peerManager.shutDownConnection(peersNodeAddress);
|
|
||||||
|
|
||||||
if (!remainingNodeAddresses.isEmpty()) {
|
|
||||||
log.info("There are remaining nodes available for requesting peers. " +
|
|
||||||
"We will try getReportedPeers again.");
|
|
||||||
requestReportedPeersFromRandomPeer(remainingNodeAddresses);
|
|
||||||
} else {
|
|
||||||
log.info("There is no remaining node available for requesting peers. " +
|
|
||||||
"That is expected if no other node is online.\n" +
|
|
||||||
"We will try to use reported peers (if no available we use persisted peers) " +
|
|
||||||
"and try again to request peers from our seed nodes after a random pause.");
|
|
||||||
if (connectToMorePeersTimer == null)
|
|
||||||
connectToMorePeersTimer = UserThread.runAfterRandomDelay(this::connectToMorePeers, 20, 30);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,20 +192,48 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
connectToMorePeers();
|
connectToMorePeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use all outbound connections for updating reported peers and make sure we keep the connection alive
|
// Use all outbound connections older than 5 min. for updating reported peers and make sure we keep the connection alive
|
||||||
// Inbound connections should be maintained be the requesting peer
|
// Inbound connections should be maintained be the requesting peer
|
||||||
confirmedConnections.stream()
|
confirmedConnections.stream()
|
||||||
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
|
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
|
||||||
c instanceof OutboundConnection).
|
c instanceof OutboundConnection &&
|
||||||
forEach(c -> UserThread.runAfterRandomDelay(() ->
|
new Date().getTime() - c.getLastActivityDate().getTime() > 5 * 60 * 1000)
|
||||||
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>())
|
.forEach(c -> UserThread.runAfterRandomDelay(() -> {
|
||||||
|
log.trace("Call requestReportedPeers from maintainConnections");
|
||||||
|
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>());
|
||||||
|
}
|
||||||
, 3, 5));
|
, 3, 5));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void connectToMorePeers() {
|
||||||
|
Log.traceCall();
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
stopConnectToMorePeersTimer();
|
||||||
// Utils
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
if (!peerManager.hasSufficientConnections()) {
|
||||||
|
// We create a new list of not connected candidates
|
||||||
|
// 1. reported sorted by most recent lastActivityDate
|
||||||
|
// 2. persisted sorted by most recent lastActivityDate
|
||||||
|
// 3. seenNodes
|
||||||
|
List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
|
||||||
|
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
|
||||||
|
list.addAll(seedNodeAddresses.stream()
|
||||||
|
.filter(e -> !list.contains(e) &&
|
||||||
|
!peerManager.isSelf(e) &&
|
||||||
|
!peerManager.isConfirmed(e))
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
log.trace("Sorted and filtered list: list=" + list);
|
||||||
|
if (!list.isEmpty()) {
|
||||||
|
NodeAddress nextCandidate = list.get(0);
|
||||||
|
list.remove(nextCandidate);
|
||||||
|
requestReportedPeers(nextCandidate, list);
|
||||||
|
} else {
|
||||||
|
log.info("No more peers are available for requestReportedPeers.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("We have already sufficient connections.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// sorted by most recent lastActivityDate
|
// sorted by most recent lastActivityDate
|
||||||
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
|
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
|
||||||
|
@ -291,15 +255,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
requestReportedPeers(nextCandidate, remainingNodeAddresses);
|
requestReportedPeers(nextCandidate, remainingNodeAddresses);
|
||||||
}
|
}
|
||||||
|
|
||||||
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
|
|
||||||
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
|
|
||||||
.filter(e -> !peerManager.isSeedNode(e) &&
|
|
||||||
!peerManager.isSelf(e) &&
|
|
||||||
!e.nodeAddress.equals(receiverNodeAddress)
|
|
||||||
)
|
|
||||||
.collect(Collectors.toSet()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void stopConnectToMorePeersTimer() {
|
private void stopConnectToMorePeersTimer() {
|
||||||
if (connectToMorePeersTimer != null) {
|
if (connectToMorePeersTimer != null) {
|
||||||
connectToMorePeersTimer.cancel();
|
connectToMorePeersTimer.cancel();
|
||||||
|
@ -313,11 +268,4 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
maintainConnectionsTimer = null;
|
maintainConnectionsTimer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopTimeoutTimer() {
|
|
||||||
if (timeoutTimer != null) {
|
|
||||||
timeoutTimer.cancel();
|
|
||||||
timeoutTimer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,9 +178,8 @@ public class PeerManager implements ConnectionListener, MessageListener {
|
||||||
"MAX_CONNECTIONS_HIGH_PRIORITY limit of {}", MAX_CONNECTIONS_EXTENDED_2);
|
"MAX_CONNECTIONS_HIGH_PRIORITY limit of {}", MAX_CONNECTIONS_EXTENDED_2);
|
||||||
if (size > MAX_CONNECTIONS_EXTENDED_2) {
|
if (size > MAX_CONNECTIONS_EXTENDED_2) {
|
||||||
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
|
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
|
||||||
// All expect DIRECT_MSG_PEER type connections
|
// All connections
|
||||||
candidates = allConnections.stream()
|
candidates = allConnections.stream()
|
||||||
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
|
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -436,10 +435,11 @@ public class PeerManager implements ConnectionListener, MessageListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printConnectedPeers() {
|
private void printConnectedPeers() {
|
||||||
if (!networkNode.getNodeAddressesOfConfirmedConnections().isEmpty()) {
|
if (!networkNode.getConfirmedConnections().isEmpty()) {
|
||||||
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
|
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
|
||||||
"Connected peers for node " + networkNode.getNodeAddress() + ":");
|
"Connected peers for node " + networkNode.getNodeAddress() + ":");
|
||||||
networkNode.getNodeAddressesOfConfirmedConnections().stream().forEach(e -> result.append("\n").append(e));
|
networkNode.getConfirmedConnections().stream().forEach(e -> result.append("\n")
|
||||||
|
.append(e.getPeersNodeAddressOptional().get()).append(" ").append(e.getPeerType()));
|
||||||
result.append("\n------------------------------------------------------------\n");
|
result.append("\n------------------------------------------------------------\n");
|
||||||
log.info(result.toString());
|
log.info(result.toString());
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
private final PeerManager peerManager;
|
private final PeerManager peerManager;
|
||||||
private final Listener listener;
|
private final Listener listener;
|
||||||
private Timer timeoutTimer;
|
private Timer timeoutTimer;
|
||||||
private long requestNonce;
|
private final long nonce = new Random().nextLong();
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -69,22 +69,19 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
|
|
||||||
public void shutDown() {
|
public void shutDown() {
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
|
|
||||||
networkNode.removeMessageListener(this);
|
networkNode.removeMessageListener(this);
|
||||||
|
|
||||||
stopTimeoutTimer();
|
stopTimeoutTimer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public void requestData(NodeAddress nodeAddress) {
|
public void requestData(NodeAddress nodeAddress) {
|
||||||
Log.traceCall("nodeAddress=" + nodeAddress);
|
Log.traceCall("nodeAddress=" + nodeAddress);
|
||||||
|
|
||||||
stopTimeoutTimer();
|
|
||||||
|
|
||||||
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
|
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
|
||||||
|
|
||||||
timeoutTimer = UserThread.runAfter(() -> {
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
log.info("timeoutTimer called");
|
log.info("timeoutTimer called");
|
||||||
peerManager.shutDownConnection(nodeAddress);
|
peerManager.shutDownConnection(nodeAddress);
|
||||||
|
@ -94,11 +91,10 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
10, TimeUnit.SECONDS);
|
10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
Message dataRequest;
|
Message dataRequest;
|
||||||
requestNonce = new Random().nextLong();
|
|
||||||
if (networkNode.getNodeAddress() == null)
|
if (networkNode.getNodeAddress() == null)
|
||||||
dataRequest = new PreliminaryDataRequest(requestNonce);
|
dataRequest = new PreliminaryDataRequest(nonce);
|
||||||
else
|
else
|
||||||
dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), requestNonce);
|
dataRequest = new UpdateDataRequest(networkNode.getNodeAddress(), nonce);
|
||||||
|
|
||||||
log.info("We send a {} to peer {}. ", dataRequest.getClass().getSimpleName(), nodeAddress);
|
log.info("We send a {} to peer {}. ", dataRequest.getClass().getSimpleName(), nodeAddress);
|
||||||
|
|
||||||
|
@ -106,14 +102,14 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Connection connection) {
|
public void onSuccess(@Nullable Connection connection) {
|
||||||
log.trace("Send DataRequest to " + nodeAddress + " succeeded.");
|
log.trace("Send " + dataRequest + " to " + nodeAddress + " succeeded.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
String errorMessage = "Sending " + dataRequest.getClass().getSimpleName() + " to " + nodeAddress +
|
String errorMessage = "Sending dataRequest to " + nodeAddress +
|
||||||
" failed. That is expected if the peer is offline. " +
|
" failed. That is expected if the peer is offline. dataRequest=" + dataRequest + "." +
|
||||||
"Exception:" + throwable.getMessage();
|
"Exception: " + throwable.getMessage();
|
||||||
log.info(errorMessage);
|
log.info(errorMessage);
|
||||||
|
|
||||||
peerManager.shutDownConnection(nodeAddress);
|
peerManager.shutDownConnection(nodeAddress);
|
||||||
|
@ -123,43 +119,54 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onDataRequest(Message message, final Connection connection) {
|
||||||
|
Log.traceCall(message.toString() + " / connection=" + connection);
|
||||||
|
|
||||||
|
checkArgument(timeoutTimer == null, "requestData must not be called twice.");
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
|
log.info("timeoutTimer called");
|
||||||
|
peerManager.shutDownConnection(connection);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault("A timeout occurred");
|
||||||
|
},
|
||||||
|
10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
DataRequest dataRequest = (DataRequest) message;
|
||||||
|
DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce());
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(connection, dataResponse);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("Send DataResponse to {} succeeded. dataResponse={}",
|
||||||
|
connection.getPeersNodeAddressOptional(), dataResponse);
|
||||||
|
shutDown();
|
||||||
|
listener.onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
String errorMessage = "Sending dataRequest to " + connection +
|
||||||
|
" failed. That is expected if the peer is offline. dataRequest=" + dataRequest + "." +
|
||||||
|
"Exception: " + throwable.getMessage();
|
||||||
|
log.info(errorMessage);
|
||||||
|
|
||||||
|
peerManager.shutDownConnection(connection);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// MessageListener implementation
|
// MessageListener implementation
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, Connection connection) {
|
public void onMessage(Message message, Connection connection) {
|
||||||
if (message instanceof DataRequest) {
|
if (message instanceof DataResponse) {
|
||||||
Log.traceCall(message.toString());
|
Log.traceCall(message.toString() + " / connection=" + connection);
|
||||||
DataRequest dataRequest = (DataRequest) message;
|
|
||||||
DataResponse dataResponse = new DataResponse(new HashSet<>(dataStorage.getMap().values()), dataRequest.getNonce());
|
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, dataResponse);
|
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Connection connection) {
|
|
||||||
log.trace("Send DataResponse to {} succeeded. dataResponse={}",
|
|
||||||
connection.getPeersNodeAddressOptional(), dataResponse);
|
|
||||||
shutDown();
|
|
||||||
listener.onComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(@NotNull Throwable throwable) {
|
|
||||||
String errorMessage = "Send DataResponse to " + connection.getPeersNodeAddressOptional() + " failed. " +
|
|
||||||
"That is expected if the peer went offline. " +
|
|
||||||
"Exception:" + throwable.getMessage();
|
|
||||||
log.info(errorMessage);
|
|
||||||
|
|
||||||
peerManager.shutDownConnection(connection);
|
|
||||||
shutDown();
|
|
||||||
listener.onFault(errorMessage);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else if (message instanceof DataResponse) {
|
|
||||||
DataResponse dataResponse = (DataResponse) message;
|
DataResponse dataResponse = (DataResponse) message;
|
||||||
if (dataResponse.requestNonce == requestNonce) {
|
if (dataResponse.requestNonce == nonce) {
|
||||||
Log.traceCall(message.toString());
|
|
||||||
|
|
||||||
stopTimeoutTimer();
|
stopTimeoutTimer();
|
||||||
|
|
||||||
// connection.getPeersNodeAddressOptional() is not present at the first call
|
// connection.getPeersNodeAddressOptional() is not present at the first call
|
||||||
|
@ -168,12 +175,21 @@ public class RequestDataHandshake implements MessageListener {
|
||||||
((DataResponse) message).dataSet.stream()
|
((DataResponse) message).dataSet.stream()
|
||||||
.forEach(e -> dataStorage.add(e, peersNodeAddress));
|
.forEach(e -> dataStorage.add(e, peersNodeAddress));
|
||||||
});
|
});
|
||||||
|
|
||||||
shutDown();
|
shutDown();
|
||||||
listener.onComplete();
|
listener.onComplete();
|
||||||
|
} else {
|
||||||
|
log.debug("Nonce not matching. That happens if we get a response after a canceled handshake " +
|
||||||
|
"(timeout). We drop that message. nonce={} / requestNonce={}",
|
||||||
|
nonce, dataResponse.requestNonce);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private void stopTimeoutTimer() {
|
private void stopTimeoutTimer() {
|
||||||
if (timeoutTimer != null) {
|
if (timeoutTimer != null) {
|
||||||
timeoutTimer.cancel();
|
timeoutTimer.cancel();
|
||||||
|
|
|
@ -49,7 +49,6 @@ public class RequestDataManager implements MessageListener {
|
||||||
private final Listener listener;
|
private final Listener listener;
|
||||||
|
|
||||||
private final Map<NodeAddress, RequestDataHandshake> requestDataHandshakeMap = new HashMap<>();
|
private final Map<NodeAddress, RequestDataHandshake> requestDataHandshakeMap = new HashMap<>();
|
||||||
|
|
||||||
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
|
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
|
||||||
private Timer requestDataTimer;
|
private Timer requestDataTimer;
|
||||||
private boolean dataUpdateRequested;
|
private boolean dataUpdateRequested;
|
||||||
|
@ -64,20 +63,17 @@ public class RequestDataManager implements MessageListener {
|
||||||
this.networkNode = networkNode;
|
this.networkNode = networkNode;
|
||||||
this.dataStorage = dataStorage;
|
this.dataStorage = dataStorage;
|
||||||
this.peerManager = peerManager;
|
this.peerManager = peerManager;
|
||||||
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
|
|
||||||
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
|
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
|
||||||
|
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
|
||||||
networkNode.addMessageListener(this);
|
networkNode.addMessageListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutDown() {
|
public void shutDown() {
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
|
|
||||||
stopRequestDataTimer();
|
stopRequestDataTimer();
|
||||||
|
|
||||||
networkNode.removeMessageListener(this);
|
networkNode.removeMessageListener(this);
|
||||||
|
|
||||||
requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
|
requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +84,10 @@ public class RequestDataManager implements MessageListener {
|
||||||
|
|
||||||
public void requestPreliminaryData() {
|
public void requestPreliminaryData() {
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
requestDataFromRandomPeer(new ArrayList<>(seedNodeAddresses));
|
ArrayList<NodeAddress> nodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||||
|
NodeAddress nextCandidate = nodeAddresses.get(0);
|
||||||
|
nodeAddresses.remove(nextCandidate);
|
||||||
|
requestData(nextCandidate, nodeAddresses);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void requestUpdatesData() {
|
public void requestUpdatesData() {
|
||||||
|
@ -113,8 +112,6 @@ public class RequestDataManager implements MessageListener {
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, Connection connection) {
|
public void onMessage(Message message, Connection connection) {
|
||||||
if (message instanceof DataRequest) {
|
if (message instanceof DataRequest) {
|
||||||
Log.traceCall(message.toString());
|
|
||||||
log.trace("Received {} at {}", message.getClass().getSimpleName(), connection);
|
|
||||||
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
|
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
|
||||||
new RequestDataHandshake.Listener() {
|
new RequestDataHandshake.Listener() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,11 +122,11 @@ public class RequestDataManager implements MessageListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFault(String errorMessage) {
|
public void onFault(String errorMessage) {
|
||||||
log.info("RequestDataHandshake of inbound connection failed. Connection= {}",
|
log.trace("RequestDataHandshake of inbound connection failed. {} Connection= {}",
|
||||||
errorMessage, connection);
|
errorMessage, connection);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
requestDataHandshake.onMessage(message, connection);
|
requestDataHandshake.onDataRequest(message, connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,21 +135,15 @@ public class RequestDataManager implements MessageListener {
|
||||||
// Private
|
// Private
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private void requestDataFromRandomPeer(List<NodeAddress> nodeAddresses) {
|
|
||||||
Log.traceCall("remainingNodeAddresses=" + nodeAddresses);
|
|
||||||
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
|
|
||||||
nodeAddresses.remove(nextCandidate);
|
|
||||||
requestData(nextCandidate, nodeAddresses);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
||||||
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
|
||||||
|
|
||||||
if (!requestDataHandshakeMap.containsKey(nodeAddress)) {
|
if (!requestDataHandshakeMap.containsKey(nodeAddress)) {
|
||||||
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
|
RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager,
|
||||||
new RequestDataHandshake.Listener() {
|
new RequestDataHandshake.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onComplete() {
|
public void onComplete() {
|
||||||
|
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress= {}",
|
||||||
|
nodeAddress);
|
||||||
stopRequestDataTimer();
|
stopRequestDataTimer();
|
||||||
|
|
||||||
// need to remove before listeners are notified as they cause the update call
|
// need to remove before listeners are notified as they cause the update call
|
||||||
|
@ -175,10 +166,14 @@ public class RequestDataManager implements MessageListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFault(String errorMessage) {
|
public void onFault(String errorMessage) {
|
||||||
|
log.trace("RequestDataHandshake of outbound connection failed. {} nodeAddress= {}",
|
||||||
|
errorMessage, nodeAddress);
|
||||||
if (!remainingNodeAddresses.isEmpty()) {
|
if (!remainingNodeAddresses.isEmpty()) {
|
||||||
log.info("There are remaining nodes available for requesting data. " +
|
log.info("There are remaining nodes available for requesting data. " +
|
||||||
"We will try requestDataFromPeers again.");
|
"We will try requestDataFromPeers again.");
|
||||||
requestDataFromRandomPeer(remainingNodeAddresses);
|
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
|
||||||
|
remainingNodeAddresses.remove(nextCandidate);
|
||||||
|
requestData(nextCandidate, remainingNodeAddresses);
|
||||||
} else {
|
} else {
|
||||||
log.info("There is no remaining node available for requesting data. " +
|
log.info("There is no remaining node available for requesting data. " +
|
||||||
"That is expected if no other node is online.\n" +
|
"That is expected if no other node is online.\n" +
|
||||||
|
@ -219,7 +214,7 @@ public class RequestDataManager implements MessageListener {
|
||||||
requestDataHandshakeMap.put(nodeAddress, requestDataHandshake);
|
requestDataHandshakeMap.put(nodeAddress, requestDataHandshake);
|
||||||
requestDataHandshake.requestData(nodeAddress);
|
requestDataHandshake.requestData(nodeAddress);
|
||||||
} else {
|
} else {
|
||||||
log.warn("We have started already a DataRequest request to peer. " + nodeAddress);
|
log.warn("We have started already a requestDataHandshake to peer. " + nodeAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,10 +12,12 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
private final NodeAddress senderNodeAddress;
|
private final NodeAddress senderNodeAddress;
|
||||||
|
public long nonce;
|
||||||
public final HashSet<ReportedPeer> reportedPeers;
|
public final HashSet<ReportedPeer> reportedPeers;
|
||||||
|
|
||||||
public GetPeersRequest(NodeAddress senderNodeAddress, HashSet<ReportedPeer> reportedPeers) {
|
public GetPeersRequest(NodeAddress senderNodeAddress, long nonce, HashSet<ReportedPeer> reportedPeers) {
|
||||||
this.senderNodeAddress = senderNodeAddress;
|
this.senderNodeAddress = senderNodeAddress;
|
||||||
|
this.nonce = nonce;
|
||||||
this.reportedPeers = reportedPeers;
|
this.reportedPeers = reportedPeers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +30,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "GetPeersRequest{" +
|
return "GetPeersRequest{" +
|
||||||
"senderNodeAddress=" + senderNodeAddress +
|
"senderNodeAddress=" + senderNodeAddress +
|
||||||
|
", requestNonce=" + nonce +
|
||||||
", reportedPeers=" + reportedPeers +
|
", reportedPeers=" + reportedPeers +
|
||||||
super.toString() + "} ";
|
super.toString() + "} ";
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,16 +9,19 @@ public final class GetPeersResponse extends PeerExchangeMessage {
|
||||||
// That object is sent over the wire, so we need to take care of version compatibility.
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
|
||||||
|
|
||||||
|
public final long requestNonce;
|
||||||
public final HashSet<ReportedPeer> reportedPeers;
|
public final HashSet<ReportedPeer> reportedPeers;
|
||||||
|
|
||||||
public GetPeersResponse(HashSet<ReportedPeer> reportedPeers) {
|
public GetPeersResponse(long requestNonce, HashSet<ReportedPeer> reportedPeers) {
|
||||||
|
this.requestNonce = requestNonce;
|
||||||
this.reportedPeers = reportedPeers;
|
this.reportedPeers = reportedPeers;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "GetPeersResponse{" +
|
return "GetPeersResponse{" +
|
||||||
"reportedPeers=" + reportedPeers +
|
"requestNonce=" + requestNonce +
|
||||||
|
", reportedPeers=" + reportedPeers +
|
||||||
super.toString() + "} ";
|
super.toString() + "} ";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue