Fix peer exchange

This commit is contained in:
Manfred Karrer 2016-01-27 03:09:05 +01:00
parent 9339ab57cc
commit 579c3797bc

View file

@ -35,7 +35,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager; private final PeerManager peerManager;
private final Set<NodeAddress> seedNodeAddresses; private final Set<NodeAddress> seedNodeAddresses;
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
private Timer requestReportedPeersAfterDelayTimer, timeoutTimer, checkForSeedNodesTimer; private Timer continueWithMorePeersTimer, timeoutTimer, checkConnectionsTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -55,8 +55,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall(); Log.traceCall();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopRequestReportedPeersAfterDelayTimer(); stopContinueWithMorePeersTimer();
stopCheckForSeedNodesTimer(); stopCheckConnectionsTimer();
stopTimeoutTimer(); stopTimeoutTimer();
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
} }
@ -70,7 +70,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses)); requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses));
long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min. long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
executor.scheduleAtFixedRate(() -> UserThread.execute(this::checkForSeedNode), executor.scheduleAtFixedRate(() -> UserThread.execute(this::checkConnections),
delay, delay, TimeUnit.SECONDS); delay, delay, TimeUnit.SECONDS);
} }
@ -86,10 +86,12 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
// We use a timer to throttle if we get a series of disconnects // We use a timer to throttle if we get a series of disconnects
// The more connections we have the more relaxed we are with a checkForSeedNode // The more connections we have the more relaxed we are with a checkConnections
if (checkForSeedNodesTimer == null) if (checkConnectionsTimer == null)
checkForSeedNodesTimer = UserThread.runAfter(this::checkForSeedNode, checkConnectionsTimer = UserThread.runAfter(this::checkConnections,
networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS); networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS);
} }
@Override @Override
@ -145,7 +147,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers"); checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestReportedPeers");
stopRequestReportedPeersAfterDelayTimer(); stopContinueWithMorePeersTimer();
stopTimeoutTimer(); stopTimeoutTimer();
timeoutTimer = UserThread.runAfter(() -> { timeoutTimer = UserThread.runAfter(() -> {
@ -186,8 +188,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
"That is expected if no other node is online.\n" + "That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " + "We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request peers from our seed nodes after a random pause."); "and try again to request peers from our seed nodes after a random pause.");
requestReportedPeersAfterDelayTimer = UserThread.runAfter(this::continueWithMorePeers, if (continueWithMorePeersTimer == null)
10, TimeUnit.SECONDS); continueWithMorePeersTimer = UserThread.runAfter(this::continueWithMorePeers,
30, TimeUnit.SECONDS);
} }
} }
@ -199,6 +202,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private void continueWithMorePeers() { private void continueWithMorePeers() {
Log.traceCall(); Log.traceCall();
stopContinueWithMorePeersTimer();
if (!peerManager.hasSufficientConnections()) { if (!peerManager.hasSufficientConnections()) {
// We want to keep it sorted but avoid duplicates // We want to keep it sorted but avoid duplicates
List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>())); List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
@ -237,17 +241,20 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
// we check if we have at least one seed node connected // we check if we have at least one seed node connected
private void checkForSeedNode() { private void checkConnections() {
Log.traceCall(); Log.traceCall();
stopCheckForSeedNodesTimer(); stopCheckConnectionsTimer();
Set<Connection> allConnections = networkNode.getConfirmedConnections(); Set<Connection> allConnections = networkNode.getConfirmedConnections();
List<Connection> seedNodes = allConnections.stream() List<Connection> connectedSeedNodes = allConnections.stream()
.filter(peerManager::isSeedNode) .filter(peerManager::isSeedNode)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (seedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) { log.debug("connectedSeedNodes " + connectedSeedNodes);
if (connectedSeedNodes.size() == 0 && !seedNodeAddresses.isEmpty())
requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses)); requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses));
}
if (continueWithMorePeersTimer == null)
continueWithMorePeersTimer = UserThread.runAfterRandomDelay(this::continueWithMorePeers, 10, 20);
} }
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) { private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
@ -259,17 +266,17 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
} }
private void stopRequestReportedPeersAfterDelayTimer() { private void stopContinueWithMorePeersTimer() {
if (requestReportedPeersAfterDelayTimer != null) { if (continueWithMorePeersTimer != null) {
requestReportedPeersAfterDelayTimer.cancel(); continueWithMorePeersTimer.cancel();
requestReportedPeersAfterDelayTimer = null; continueWithMorePeersTimer = null;
} }
} }
private void stopCheckForSeedNodesTimer() { private void stopCheckConnectionsTimer() {
if (checkForSeedNodesTimer != null) { if (checkConnectionsTimer != null) {
checkForSeedNodesTimer.cancel(); checkConnectionsTimer.cancel();
checkForSeedNodesTimer = null; checkConnectionsTimer = null;
} }
} }