mirror of
https://github.com/haveno-dex/haveno.git
synced 2025-06-07 06:32:47 -04:00
Add maintenance package, use connected peers for reported peers (WIP)
This commit is contained in:
parent
12b7555c66
commit
04b32a35a2
9 changed files with 609 additions and 84 deletions
|
@ -76,7 +76,7 @@ import static io.bitsquare.app.BitsquareEnvironment.APP_NAME_KEY;
|
||||||
public class BitsquareApp extends Application {
|
public class BitsquareApp extends Application {
|
||||||
private static final Logger log = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BitsquareApp.class);
|
private static final Logger log = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BitsquareApp.class);
|
||||||
|
|
||||||
public static final boolean DEV_MODE = false;
|
public static final boolean DEV_MODE = true;
|
||||||
public static final boolean IS_RELEASE_VERSION = !DEV_MODE && true;
|
public static final boolean IS_RELEASE_VERSION = !DEV_MODE && true;
|
||||||
|
|
||||||
private static Environment env;
|
private static Environment env;
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import io.bitsquare.app.Log;
|
||||||
|
import io.bitsquare.common.UserThread;
|
||||||
|
import io.bitsquare.p2p.Message;
|
||||||
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
|
import io.bitsquare.p2p.network.CloseConnectionReason;
|
||||||
|
import io.bitsquare.p2p.network.Connection;
|
||||||
|
import io.bitsquare.p2p.network.MessageListener;
|
||||||
|
import io.bitsquare.p2p.network.NetworkNode;
|
||||||
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
|
||||||
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
public class MaintenanceHandshake implements MessageListener {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MaintenanceHandshake.class);
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Listener
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public interface Listener {
|
||||||
|
void onComplete();
|
||||||
|
|
||||||
|
void onFault(String errorMessage, @Nullable Connection connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Class fields
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private final NetworkNode networkNode;
|
||||||
|
private final PeerManager peerManager;
|
||||||
|
private final Listener listener;
|
||||||
|
private final long nonce = new Random().nextLong();
|
||||||
|
private Timer timeoutTimer;
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Constructor
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public MaintenanceHandshake(NetworkNode networkNode, PeerManager peerManager, Listener listener) {
|
||||||
|
this.networkNode = networkNode;
|
||||||
|
this.peerManager = peerManager;
|
||||||
|
this.listener = listener;
|
||||||
|
|
||||||
|
networkNode.addMessageListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutDown() {
|
||||||
|
networkNode.removeMessageListener(this);
|
||||||
|
stopTimeoutTimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// API
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void requestReportedPeers(NodeAddress nodeAddress) {
|
||||||
|
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
|
||||||
|
checkNotNull(networkNode.getNodeAddress(), "PeerExchangeHandshake.requestReportedPeers: My node address must " +
|
||||||
|
"not be null at requestReportedPeers");
|
||||||
|
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getConnectedPeers(nodeAddress));
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
String errorMessage = "Sending getPeersRequest to " + nodeAddress +
|
||||||
|
" failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest +
|
||||||
|
".\n\tException=" + throwable.getMessage();
|
||||||
|
log.info(errorMessage);
|
||||||
|
|
||||||
|
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage, null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice.");
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
|
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
|
||||||
|
log.info(errorMessage + " / PeerExchangeHandshake=" +
|
||||||
|
MaintenanceHandshake.this);
|
||||||
|
|
||||||
|
log.info("timeoutTimer called on " + this);
|
||||||
|
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage, null);
|
||||||
|
},
|
||||||
|
20, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onGetPeersRequest(GetPeersRequest getPeersRequest, final Connection connection) {
|
||||||
|
Log.traceCall("getPeersRequest=" + getPeersRequest + "\n\tconnection=" + connection + "\n\tthis=" + this);
|
||||||
|
|
||||||
|
HashSet<ReportedPeer> reportedPeers = getPeersRequest.reportedPeers;
|
||||||
|
|
||||||
|
/* StringBuilder result = new StringBuilder("Received peers:");
|
||||||
|
reportedPeers.stream().forEach(e -> result.append("\n\t").append(e));
|
||||||
|
log.trace(result.toString());*/
|
||||||
|
log.trace("reportedPeers.size=" + reportedPeers.size());
|
||||||
|
|
||||||
|
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
||||||
|
"The peers address must have been already set at the moment");
|
||||||
|
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce,
|
||||||
|
getConnectedPeers(connection.getPeersNodeAddressOptional().get()));
|
||||||
|
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
||||||
|
getPeersResponse);
|
||||||
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Connection connection) {
|
||||||
|
log.trace("GetPeersResponse sent successfully");
|
||||||
|
shutDown();
|
||||||
|
listener.onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NotNull Throwable throwable) {
|
||||||
|
String errorMessage = "Sending getPeersRequest to " + connection +
|
||||||
|
" failed. That is expected if the peer is offline. getPeersRequest=" + getPeersRequest + "." +
|
||||||
|
"Exception: " + throwable.getMessage();
|
||||||
|
log.info(errorMessage);
|
||||||
|
|
||||||
|
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage, connection);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice.");
|
||||||
|
timeoutTimer = UserThread.runAfter(() -> {
|
||||||
|
String errorMessage = "A timeout occurred at sending getPeersResponse:" + getPeersResponse + " on connection:" + connection;
|
||||||
|
log.info(errorMessage + " / PeerExchangeHandshake=" +
|
||||||
|
MaintenanceHandshake.this);
|
||||||
|
|
||||||
|
log.info("timeoutTimer called. this=" + this);
|
||||||
|
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
||||||
|
shutDown();
|
||||||
|
listener.onFault(errorMessage, connection);
|
||||||
|
},
|
||||||
|
20, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
peerManager.addToReportedPeers(reportedPeers, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// MessageListener implementation
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, Connection connection) {
|
||||||
|
if (message instanceof GetPeersResponse) {
|
||||||
|
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
|
||||||
|
Log.traceCall("this=" + this);
|
||||||
|
GetPeersResponse getPeersResponse = (GetPeersResponse) message;
|
||||||
|
if (getPeersResponse.requestNonce == nonce) {
|
||||||
|
stopTimeoutTimer();
|
||||||
|
|
||||||
|
HashSet<ReportedPeer> reportedPeers = getPeersResponse.reportedPeers;
|
||||||
|
StringBuilder result = new StringBuilder("Received peers:");
|
||||||
|
reportedPeers.stream().forEach(e -> result.append("\n\t").append(e));
|
||||||
|
log.trace(result.toString());
|
||||||
|
peerManager.addToReportedPeers(reportedPeers, connection);
|
||||||
|
|
||||||
|
shutDown();
|
||||||
|
listener.onComplete();
|
||||||
|
} else {
|
||||||
|
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled handshake " +
|
||||||
|
"(timeout causes connection close but peer might have sent a msg before connection " +
|
||||||
|
"was closed).\n\tWe drop that message. nonce={} / requestNonce={}",
|
||||||
|
nonce, getPeersResponse.requestNonce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private HashSet<ReportedPeer> getConnectedPeers(NodeAddress receiverNodeAddress) {
|
||||||
|
return new HashSet<>(peerManager.getConnectedPeers().stream()
|
||||||
|
.filter(e -> !peerManager.isSeedNode(e) &&
|
||||||
|
!peerManager.isSelf(e) &&
|
||||||
|
!e.nodeAddress.equals(receiverNodeAddress)
|
||||||
|
)
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopTimeoutTimer() {
|
||||||
|
if (timeoutTimer != null) {
|
||||||
|
timeoutTimer.cancel();
|
||||||
|
timeoutTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,286 @@
|
||||||
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import io.bitsquare.app.Log;
|
||||||
|
import io.bitsquare.common.UserThread;
|
||||||
|
import io.bitsquare.common.util.Utilities;
|
||||||
|
import io.bitsquare.p2p.Message;
|
||||||
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
|
import io.bitsquare.p2p.network.*;
|
||||||
|
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
public class MaintenanceManager implements MessageListener, ConnectionListener {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class);
|
||||||
|
|
||||||
|
private final NetworkNode networkNode;
|
||||||
|
private final PeerManager peerManager;
|
||||||
|
private final Set<NodeAddress> seedNodeAddresses;
|
||||||
|
private final ScheduledThreadPoolExecutor executor;
|
||||||
|
private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>();
|
||||||
|
private Timer connectToMorePeersTimer, maintainConnectionsTimer;
|
||||||
|
private boolean shutDownInProgress;
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Constructor
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public MaintenanceManager(NetworkNode networkNode, PeerManager peerManager, Set<NodeAddress> seedNodeAddresses) {
|
||||||
|
this.networkNode = networkNode;
|
||||||
|
this.peerManager = peerManager;
|
||||||
|
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty");
|
||||||
|
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
|
||||||
|
|
||||||
|
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5);
|
||||||
|
networkNode.addMessageListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutDown() {
|
||||||
|
Log.traceCall();
|
||||||
|
shutDownInProgress = true;
|
||||||
|
|
||||||
|
networkNode.removeMessageListener(this);
|
||||||
|
stopConnectToMorePeersTimer();
|
||||||
|
stopMaintainConnectionsTimer();
|
||||||
|
peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::closeHandshake);
|
||||||
|
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// API
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
public void requestReportedPeersFromSeedNodes(NodeAddress nodeAddress) {
|
||||||
|
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
|
||||||
|
ArrayList<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||||
|
remainingNodeAddresses.remove(nodeAddress);
|
||||||
|
Collections.shuffle(remainingNodeAddresses);
|
||||||
|
requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
||||||
|
|
||||||
|
int delay = new Random().nextInt(60) + 60 * 3; // 3-4 min
|
||||||
|
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
|
||||||
|
delay, delay, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// ConnectionListener implementation
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConnection(Connection connection) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
|
||||||
|
// We use a timer to throttle if we get a series of disconnects
|
||||||
|
// The more connections we have the more relaxed we are with a checkConnections
|
||||||
|
stopMaintainConnectionsTimer();
|
||||||
|
int size = networkNode.getAllConnections().size();
|
||||||
|
int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min)
|
||||||
|
maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
|
||||||
|
delay, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable throwable) {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// MessageListener implementation
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, Connection connection) {
|
||||||
|
if (message instanceof GetPeersRequest) {
|
||||||
|
Log.traceCall(message.toString() + "\n\tconnection=" + connection);
|
||||||
|
PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
|
||||||
|
peerManager,
|
||||||
|
new PeerExchangeHandshake.Listener() {
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
log.trace("PeerExchangeHandshake of inbound connection complete.\n\tConnection={}", connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFault(String errorMessage, @Nullable Connection connection) {
|
||||||
|
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
|
||||||
|
"connection={}", errorMessage, connection);
|
||||||
|
peerManager.handleConnectionFault(connection);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
|
||||||
|
Log.traceCall("nodeAddress=" + nodeAddress);
|
||||||
|
if (!peerExchangeHandshakeMap.containsKey(nodeAddress)) {
|
||||||
|
PeerExchangeHandshake peerExchangeHandshake = new PeerExchangeHandshake(networkNode,
|
||||||
|
peerManager,
|
||||||
|
new PeerExchangeHandshake.Listener() {
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress);
|
||||||
|
peerExchangeHandshakeMap.remove(nodeAddress);
|
||||||
|
connectToMorePeers();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFault(String errorMessage, @Nullable Connection connection) {
|
||||||
|
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
|
||||||
|
"nodeAddress={}", errorMessage, nodeAddress);
|
||||||
|
|
||||||
|
peerExchangeHandshakeMap.remove(nodeAddress);
|
||||||
|
peerManager.handleConnectionFault(nodeAddress, connection);
|
||||||
|
if (!shutDownInProgress) {
|
||||||
|
if (!remainingNodeAddresses.isEmpty()) {
|
||||||
|
log.info("There are remaining nodes available for requesting peers. " +
|
||||||
|
"We will try getReportedPeers again.");
|
||||||
|
requestReportedPeersFromRandomPeer(remainingNodeAddresses);
|
||||||
|
} else {
|
||||||
|
log.info("There is no remaining node available for requesting peers. " +
|
||||||
|
"That is expected if no other node is online.\n\t" +
|
||||||
|
"We will try again after a random pause.");
|
||||||
|
if (connectToMorePeersTimer == null)
|
||||||
|
connectToMorePeersTimer = UserThread.runAfterRandomDelay(
|
||||||
|
MaintenanceManager.this::connectToMorePeers, 20, 30);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake);
|
||||||
|
peerExchangeHandshake.requestConnectedPeers(nodeAddress);
|
||||||
|
} else {
|
||||||
|
//TODO check when that happens
|
||||||
|
log.warn("We have started already a peerExchangeHandshake. " +
|
||||||
|
"We ignore that call. " +
|
||||||
|
"nodeAddress=" + nodeAddress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we check if we have at least one seed node connected
|
||||||
|
private void maintainConnections() {
|
||||||
|
Log.traceCall();
|
||||||
|
|
||||||
|
stopMaintainConnectionsTimer();
|
||||||
|
|
||||||
|
// we want at least 1 seed node connected
|
||||||
|
Set<Connection> confirmedConnections = networkNode.getConfirmedConnections();
|
||||||
|
long numberOfConnectedSeedNodes = confirmedConnections.stream()
|
||||||
|
.filter(peerManager::isSeedNode)
|
||||||
|
.count();
|
||||||
|
if (numberOfConnectedSeedNodes == 0) {
|
||||||
|
ArrayList<NodeAddress> nodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||||
|
Collections.shuffle(nodeAddresses);
|
||||||
|
requestReportedPeersFromRandomPeer(nodeAddresses);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// We try to get sufficient connections by connecting to reported and persisted peers
|
||||||
|
if (numberOfConnectedSeedNodes == 0) {
|
||||||
|
// If we requested a seed node we delay a bit to not have too many requests simultaneously
|
||||||
|
if (connectToMorePeersTimer == null)
|
||||||
|
connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10);
|
||||||
|
} else {
|
||||||
|
connectToMorePeers();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive
|
||||||
|
// Inbound connections should be maintained be the requesting peer
|
||||||
|
confirmedConnections.stream()
|
||||||
|
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
|
||||||
|
c instanceof OutboundConnection &&
|
||||||
|
new Date().getTime() - c.getLastActivityDate().getTime() > TimeUnit.MINUTES.toMillis(10))
|
||||||
|
.forEach(c -> {
|
||||||
|
log.trace("Call requestReportedPeers on a confirmedConnection by the maintainConnections call");
|
||||||
|
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connectToMorePeers() {
|
||||||
|
Log.traceCall();
|
||||||
|
|
||||||
|
stopConnectToMorePeersTimer();
|
||||||
|
|
||||||
|
if (!peerManager.hasSufficientConnections()) {
|
||||||
|
// We create a new list of not connected candidates
|
||||||
|
// 1. reported sorted by most recent lastActivityDate
|
||||||
|
// 2. persisted sorted by most recent lastActivityDate
|
||||||
|
// 3. seenNodes
|
||||||
|
List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
|
||||||
|
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
|
||||||
|
ArrayList<NodeAddress> seedNodeAddresses = new ArrayList<>(this.seedNodeAddresses);
|
||||||
|
Collections.shuffle(seedNodeAddresses);
|
||||||
|
list.addAll(seedNodeAddresses.stream()
|
||||||
|
.filter(e -> !list.contains(e) &&
|
||||||
|
!peerManager.isSelf(e) &&
|
||||||
|
!peerManager.isConfirmed(e))
|
||||||
|
.collect(Collectors.toSet()));
|
||||||
|
log.info("Sorted and filtered list: list.size()=" + list.size());
|
||||||
|
log.trace("Sorted and filtered list: list=" + list);
|
||||||
|
if (!list.isEmpty()) {
|
||||||
|
NodeAddress nextCandidate = list.get(0);
|
||||||
|
list.remove(nextCandidate);
|
||||||
|
requestReportedPeers(nextCandidate, list);
|
||||||
|
} else {
|
||||||
|
log.info("No more peers are available for requestReportedPeers.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.info("We have already sufficient connections.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sorted by most recent lastActivityDate
|
||||||
|
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
|
||||||
|
return set.stream()
|
||||||
|
.filter(e -> !list.contains(e.nodeAddress) &&
|
||||||
|
!peerManager.isSeedNode(e) &&
|
||||||
|
!peerManager.isSelf(e) &&
|
||||||
|
!peerManager.isConfirmed(e))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.stream()
|
||||||
|
.filter(e -> e.lastActivityDate != null)
|
||||||
|
.sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate))
|
||||||
|
.map(e -> e.nodeAddress)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void requestReportedPeersFromRandomPeer(List<NodeAddress> remainingNodeAddresses) {
|
||||||
|
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
|
||||||
|
remainingNodeAddresses.remove(nextCandidate);
|
||||||
|
requestReportedPeers(nextCandidate, remainingNodeAddresses);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopConnectToMorePeersTimer() {
|
||||||
|
if (connectToMorePeersTimer != null) {
|
||||||
|
connectToMorePeersTimer.cancel();
|
||||||
|
connectToMorePeersTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopMaintainConnectionsTimer() {
|
||||||
|
if (maintainConnectionsTimer != null) {
|
||||||
|
maintainConnectionsTimer.cancel();
|
||||||
|
maintainConnectionsTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,7 +64,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
networkNode.addMessageListener(this);
|
networkNode.addMessageListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutDown() {
|
public void closeHandshake() {
|
||||||
networkNode.removeMessageListener(this);
|
networkNode.removeMessageListener(this);
|
||||||
stopTimeoutTimer();
|
stopTimeoutTimer();
|
||||||
}
|
}
|
||||||
|
@ -74,11 +74,11 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
// API
|
// API
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
public void requestReportedPeers(NodeAddress nodeAddress) {
|
public void requestConnectedPeers(NodeAddress nodeAddress) {
|
||||||
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
|
Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this);
|
||||||
checkNotNull(networkNode.getNodeAddress(), "PeerExchangeHandshake.requestReportedPeers: My node address must " +
|
checkNotNull(networkNode.getNodeAddress(), "PeerExchangeHandshake.requestReportedPeers: My node address must " +
|
||||||
"not be null at requestReportedPeers");
|
"not be null at requestReportedPeers");
|
||||||
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getReportedPeers(nodeAddress));
|
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, getConnectedPeers(nodeAddress));
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
|
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -94,7 +94,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
log.info(errorMessage);
|
log.info(errorMessage);
|
||||||
|
|
||||||
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
|
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onFault(errorMessage, null);
|
listener.onFault(errorMessage, null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -107,7 +107,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
|
|
||||||
log.info("timeoutTimer called on " + this);
|
log.info("timeoutTimer called on " + this);
|
||||||
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onFault(errorMessage, null);
|
listener.onFault(errorMessage, null);
|
||||||
},
|
},
|
||||||
20, TimeUnit.SECONDS);
|
20, TimeUnit.SECONDS);
|
||||||
|
@ -126,14 +126,14 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
|
||||||
"The peers address must have been already set at the moment");
|
"The peers address must have been already set at the moment");
|
||||||
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce,
|
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce,
|
||||||
getReportedPeers(connection.getPeersNodeAddressOptional().get()));
|
getConnectedPeers(connection.getPeersNodeAddressOptional().get()));
|
||||||
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
SettableFuture<Connection> future = networkNode.sendMessage(connection,
|
||||||
getPeersResponse);
|
getPeersResponse);
|
||||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Connection connection) {
|
public void onSuccess(Connection connection) {
|
||||||
log.trace("GetPeersResponse sent successfully");
|
log.trace("GetPeersResponse sent successfully");
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onComplete();
|
listener.onComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +145,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
log.info(errorMessage);
|
log.info(errorMessage);
|
||||||
|
|
||||||
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
|
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onFault(errorMessage, connection);
|
listener.onFault(errorMessage, connection);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -158,7 +158,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
|
|
||||||
log.info("timeoutTimer called. this=" + this);
|
log.info("timeoutTimer called. this=" + this);
|
||||||
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onFault(errorMessage, connection);
|
listener.onFault(errorMessage, connection);
|
||||||
},
|
},
|
||||||
20, TimeUnit.SECONDS);
|
20, TimeUnit.SECONDS);
|
||||||
|
@ -186,7 +186,7 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
log.trace(result.toString());
|
log.trace(result.toString());
|
||||||
peerManager.addToReportedPeers(reportedPeers, connection);
|
peerManager.addToReportedPeers(reportedPeers, connection);
|
||||||
|
|
||||||
shutDown();
|
closeHandshake();
|
||||||
listener.onComplete();
|
listener.onComplete();
|
||||||
} else {
|
} else {
|
||||||
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled handshake " +
|
log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled handshake " +
|
||||||
|
@ -202,10 +202,9 @@ public class PeerExchangeHandshake implements MessageListener {
|
||||||
// Private
|
// Private
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
private HashSet<ReportedPeer> getReportedPeers(NodeAddress receiverNodeAddress) {
|
private HashSet<ReportedPeer> getConnectedPeers(NodeAddress receiverNodeAddress) {
|
||||||
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
|
return new HashSet<>(peerManager.getConnectedPeers().stream()
|
||||||
.filter(e -> !peerManager.isSeedNode(e) &&
|
.filter(e -> !peerManager.isSeedNode(e) &&
|
||||||
!peerManager.isSelf(e) &&
|
|
||||||
!e.nodeAddress.equals(receiverNodeAddress)
|
!e.nodeAddress.equals(receiverNodeAddress)
|
||||||
)
|
)
|
||||||
.collect(Collectors.toSet()));
|
.collect(Collectors.toSet()));
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
package io.bitsquare.p2p.peers;
|
package io.bitsquare.p2p.peers;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import io.bitsquare.app.Log;
|
import io.bitsquare.app.Log;
|
||||||
import io.bitsquare.common.UserThread;
|
import io.bitsquare.common.UserThread;
|
||||||
import io.bitsquare.common.util.Utilities;
|
|
||||||
import io.bitsquare.p2p.Message;
|
import io.bitsquare.p2p.Message;
|
||||||
import io.bitsquare.p2p.NodeAddress;
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
import io.bitsquare.p2p.network.*;
|
import io.bitsquare.p2p.network.*;
|
||||||
|
@ -13,8 +11,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
@ -26,9 +22,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
private final NetworkNode networkNode;
|
private final NetworkNode networkNode;
|
||||||
private final PeerManager peerManager;
|
private final PeerManager peerManager;
|
||||||
private final Set<NodeAddress> seedNodeAddresses;
|
private final Set<NodeAddress> seedNodeAddresses;
|
||||||
private final ScheduledThreadPoolExecutor executor;
|
|
||||||
private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>();
|
private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>();
|
||||||
private Timer connectToMorePeersTimer, maintainConnectionsTimer;
|
private Timer connectToMorePeersTimer;
|
||||||
private boolean shutDownInProgress;
|
private boolean shutDownInProgress;
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,7 +37,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty");
|
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty");
|
||||||
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
|
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
|
||||||
|
|
||||||
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5);
|
|
||||||
networkNode.addMessageListener(this);
|
networkNode.addMessageListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,9 +46,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
|
|
||||||
networkNode.removeMessageListener(this);
|
networkNode.removeMessageListener(this);
|
||||||
stopConnectToMorePeersTimer();
|
stopConnectToMorePeersTimer();
|
||||||
stopMaintainConnectionsTimer();
|
peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::closeHandshake);
|
||||||
peerExchangeHandshakeMap.values().stream().forEach(PeerExchangeHandshake::shutDown);
|
|
||||||
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,10 +60,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
remainingNodeAddresses.remove(nodeAddress);
|
remainingNodeAddresses.remove(nodeAddress);
|
||||||
Collections.shuffle(remainingNodeAddresses);
|
Collections.shuffle(remainingNodeAddresses);
|
||||||
requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
requestReportedPeers(nodeAddress, remainingNodeAddresses);
|
||||||
|
|
||||||
int delay = new Random().nextInt(60) + 60 * 3; // 3-4 min
|
|
||||||
executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
|
|
||||||
delay, delay, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -85,13 +73,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
|
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
|
||||||
// We use a timer to throttle if we get a series of disconnects
|
|
||||||
// The more connections we have the more relaxed we are with a checkConnections
|
|
||||||
stopMaintainConnectionsTimer();
|
|
||||||
int size = networkNode.getAllConnections().size();
|
|
||||||
int delay = 10 + 2 * size * size; // 12 sec - 210 sec (3.5 min)
|
|
||||||
maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
|
|
||||||
delay, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -168,7 +149,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake);
|
peerExchangeHandshakeMap.put(nodeAddress, peerExchangeHandshake);
|
||||||
peerExchangeHandshake.requestReportedPeers(nodeAddress);
|
peerExchangeHandshake.requestConnectedPeers(nodeAddress);
|
||||||
} else {
|
} else {
|
||||||
//TODO check when that happens
|
//TODO check when that happens
|
||||||
log.warn("We have started already a peerExchangeHandshake. " +
|
log.warn("We have started already a peerExchangeHandshake. " +
|
||||||
|
@ -177,44 +158,6 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we check if we have at least one seed node connected
|
|
||||||
private void maintainConnections() {
|
|
||||||
Log.traceCall();
|
|
||||||
|
|
||||||
stopMaintainConnectionsTimer();
|
|
||||||
|
|
||||||
// we want at least 1 seed node connected
|
|
||||||
Set<Connection> confirmedConnections = networkNode.getConfirmedConnections();
|
|
||||||
long numberOfConnectedSeedNodes = confirmedConnections.stream()
|
|
||||||
.filter(peerManager::isSeedNode)
|
|
||||||
.count();
|
|
||||||
if (numberOfConnectedSeedNodes == 0) {
|
|
||||||
ArrayList<NodeAddress> nodeAddresses = new ArrayList<>(seedNodeAddresses);
|
|
||||||
Collections.shuffle(nodeAddresses);
|
|
||||||
requestReportedPeersFromRandomPeer(nodeAddresses);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// We try to get sufficient connections by connecting to reported and persisted peers
|
|
||||||
if (numberOfConnectedSeedNodes == 0) {
|
|
||||||
// If we requested a seed node we delay a bit to not have too many requests simultaneously
|
|
||||||
if (connectToMorePeersTimer == null)
|
|
||||||
connectToMorePeersTimer = UserThread.runAfter(this::connectToMorePeers, 10);
|
|
||||||
} else {
|
|
||||||
connectToMorePeers();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive
|
|
||||||
// Inbound connections should be maintained be the requesting peer
|
|
||||||
confirmedConnections.stream()
|
|
||||||
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
|
|
||||||
c instanceof OutboundConnection &&
|
|
||||||
new Date().getTime() - c.getLastActivityDate().getTime() > TimeUnit.MINUTES.toMillis(10))
|
|
||||||
.forEach(c -> {
|
|
||||||
log.trace("Call requestReportedPeers on a confirmedConnection by the maintainConnections call");
|
|
||||||
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void connectToMorePeers() {
|
private void connectToMorePeers() {
|
||||||
Log.traceCall();
|
Log.traceCall();
|
||||||
|
@ -276,11 +219,4 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
|
||||||
connectToMorePeersTimer = null;
|
connectToMorePeersTimer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void stopMaintainConnectionsTimer() {
|
|
||||||
if (maintainConnectionsTimer != null) {
|
|
||||||
maintainConnectionsTimer.cancel();
|
|
||||||
maintainConnectionsTimer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,11 +425,11 @@ public class PeerManager implements ConnectionListener, MessageListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ReportedPeer> getConnectedAndReportedPeers() {
|
/* public Set<ReportedPeer> getConnectedAndReportedPeers() {
|
||||||
Set<ReportedPeer> result = new HashSet<>(reportedPeers);
|
Set<ReportedPeer> result = new HashSet<>(reportedPeers);
|
||||||
result.addAll(getConnectedPeers());
|
result.addAll(getConnectedPeers());
|
||||||
return result;
|
return result;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
public boolean isSeedNode(ReportedPeer reportedPeer) {
|
public boolean isSeedNode(ReportedPeer reportedPeer) {
|
||||||
return seedNodeAddresses.contains(reportedPeer.nodeAddress);
|
return seedNodeAddresses.contains(reportedPeer.nodeAddress);
|
||||||
|
@ -501,7 +501,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
|
||||||
return list.remove(new Random().nextInt(list.size()));
|
return list.remove(new Random().nextInt(list.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<ReportedPeer> getConnectedPeers() {
|
public Set<ReportedPeer> getConnectedPeers() {
|
||||||
// networkNode.getConfirmedConnections includes:
|
// networkNode.getConfirmedConnections includes:
|
||||||
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
|
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
|
||||||
return networkNode.getConfirmedConnections().stream()
|
return networkNode.getConfirmedConnections().stream()
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
package io.bitsquare.p2p.peers.messages.maintenance;
|
||||||
|
|
||||||
|
import io.bitsquare.app.Version;
|
||||||
|
import io.bitsquare.p2p.Message;
|
||||||
|
|
||||||
|
public abstract class MaintenanceMessage implements Message {
|
||||||
|
private final int messageVersion = Version.getP2PMessageVersion();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMessageVersion() {
|
||||||
|
return messageVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "MaintenanceMessage{" +
|
||||||
|
"messageVersion=" + messageVersion +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package io.bitsquare.p2p.peers.messages.maintenance;
|
||||||
|
|
||||||
|
import io.bitsquare.app.Version;
|
||||||
|
import io.bitsquare.p2p.NodeAddress;
|
||||||
|
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
|
||||||
|
import io.bitsquare.p2p.peers.ReportedPeer;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
public final class PingRequest extends MaintenanceMessage implements SendersNodeAddressMessage {
|
||||||
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
|
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
|
||||||
|
|
||||||
|
private final NodeAddress senderNodeAddress;
|
||||||
|
public long nonce;
|
||||||
|
public final HashSet<ReportedPeer> reportedPeers;
|
||||||
|
|
||||||
|
public PingRequest(NodeAddress senderNodeAddress, long nonce, HashSet<ReportedPeer> reportedPeers) {
|
||||||
|
this.senderNodeAddress = senderNodeAddress;
|
||||||
|
this.nonce = nonce;
|
||||||
|
this.reportedPeers = reportedPeers;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeAddress getSenderNodeAddress() {
|
||||||
|
return senderNodeAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "GetPeersRequest{" +
|
||||||
|
"senderNodeAddress=" + senderNodeAddress +
|
||||||
|
", nonce=" + nonce +
|
||||||
|
", reportedPeers.size()=" + reportedPeers.size() +
|
||||||
|
"} " + super.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package io.bitsquare.p2p.peers.messages.maintenance;
|
||||||
|
|
||||||
|
import io.bitsquare.app.Version;
|
||||||
|
import io.bitsquare.p2p.peers.ReportedPeer;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
|
public final class PongResponse extends MaintenanceMessage {
|
||||||
|
// That object is sent over the wire, so we need to take care of version compatibility.
|
||||||
|
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
|
||||||
|
|
||||||
|
public final long requestNonce;
|
||||||
|
public final HashSet<ReportedPeer> reportedPeers;
|
||||||
|
|
||||||
|
public PongResponse(long requestNonce, HashSet<ReportedPeer> reportedPeers) {
|
||||||
|
this.requestNonce = requestNonce;
|
||||||
|
this.reportedPeers = reportedPeers;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "GetPeersResponse{" +
|
||||||
|
"requestNonce=" + requestNonce +
|
||||||
|
", reportedPeers.size()=" + reportedPeers.size() +
|
||||||
|
"} " + super.toString();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue