stop when shutdown

This commit is contained in:
Manfred Karrer 2016-02-01 20:10:40 +01:00
parent 6e1f92a05b
commit 5efd69bad4
2 changed files with 56 additions and 48 deletions

View file

@ -28,6 +28,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>(); private final Map<NodeAddress, PeerExchangeHandshake> peerExchangeHandshakeMap = new HashMap<>();
private Timer connectToMorePeersTimer, maintainConnectionsTimer; private Timer connectToMorePeersTimer, maintainConnectionsTimer;
private boolean shutDownInProgress;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -46,6 +47,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public void shutDown() { public void shutDown() {
Log.traceCall(); Log.traceCall();
shutDownInProgress = true;
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopConnectToMorePeersTimer(); stopConnectToMorePeersTimer();
@ -148,17 +150,19 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
peerExchangeHandshakeMap.remove(nodeAddress); peerExchangeHandshakeMap.remove(nodeAddress);
peerManager.penalizeUnreachablePeer(nodeAddress); peerManager.penalizeUnreachablePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) { if (!shutDownInProgress) {
log.info("There are remaining nodes available for requesting peers. " + if (!remainingNodeAddresses.isEmpty()) {
"We will try getReportedPeers again."); log.info("There are remaining nodes available for requesting peers. " +
requestReportedPeersFromRandomPeer(remainingNodeAddresses); "We will try getReportedPeers again.");
} else { requestReportedPeersFromRandomPeer(remainingNodeAddresses);
log.info("There is no remaining node available for requesting peers. " + } else {
"That is expected if no other node is online.\n\t" + log.info("There is no remaining node available for requesting peers. " +
"We will try again after a random pause."); "That is expected if no other node is online.\n\t" +
if (connectToMorePeersTimer == null) "We will try again after a random pause.");
connectToMorePeersTimer = UserThread.runAfterRandomDelay( if (connectToMorePeersTimer == null)
PeerExchangeManager.this::connectToMorePeers, 20, 30); connectToMorePeersTimer = UserThread.runAfterRandomDelay(
PeerExchangeManager.this::connectToMorePeers, 20, 30);
}
} }
} }
}); });

View file

@ -52,6 +52,7 @@ public class RequestDataManager implements MessageListener {
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty(); private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
private Timer requestDataTimer; private Timer requestDataTimer;
private boolean dataUpdateRequested; private boolean dataUpdateRequested;
private boolean shutDownInProgress;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -72,6 +73,7 @@ public class RequestDataManager implements MessageListener {
public void shutDown() { public void shutDown() {
Log.traceCall(); Log.traceCall();
shutDownInProgress = true;
stopRequestDataTimer(); stopRequestDataTimer();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown); requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown);
@ -175,46 +177,48 @@ public class RequestDataManager implements MessageListener {
peerManager.penalizeUnreachablePeer(nodeAddress); peerManager.penalizeUnreachablePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) { if (!shutDownInProgress) {
log.info("There are remaining nodes available for requesting data. " + if (!remainingNodeAddresses.isEmpty()) {
"We will try requestDataFromPeers again."); log.info("There are remaining nodes available for requesting data. " +
NodeAddress nextCandidate = remainingNodeAddresses.get(0); "We will try requestDataFromPeers again.");
remainingNodeAddresses.remove(nextCandidate); NodeAddress nextCandidate = remainingNodeAddresses.get(0);
requestData(nextCandidate, remainingNodeAddresses); remainingNodeAddresses.remove(nextCandidate);
} else { requestData(nextCandidate, remainingNodeAddresses);
log.info("There is no remaining node available for requesting data. " + } else {
"That is expected if no other node is online.\n\t" + log.info("There is no remaining node available for requesting data. " +
"We will try to use reported peers (if no available we use persisted peers) " + "That is expected if no other node is online.\n\t" +
"and try again to request data from our seed nodes after a random pause."); "We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
// try again after a pause // try again after a pause
stopRequestDataTimer(); stopRequestDataTimer();
requestDataTimer = UserThread.runAfterRandomDelay(() -> { requestDataTimer = UserThread.runAfterRandomDelay(() -> {
log.trace("requestDataAfterDelayTimer called"); log.trace("requestDataAfterDelayTimer called");
// We want to keep it sorted but avoid duplicates // We want to keep it sorted but avoid duplicates
// We don't filter out already established connections for seed nodes as it might be that // We don't filter out already established connections for seed nodes as it might be that
// we got from the other seed node contacted but we still have not requested the initial // we got from the other seed node contacted but we still have not requested the initial
// data set // data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses); List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(list); Collections.shuffle(list);
list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list)); list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list));
list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty."); checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty.");
NodeAddress nextCandidate = list.get(0); NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate); list.remove(nextCandidate);
requestData(nextCandidate, list); requestData(nextCandidate, list);
}, },
10, 15, TimeUnit.SECONDS); 10, 15, TimeUnit.SECONDS);
} }
requestDataHandshakeMap.remove(nodeAddress); requestDataHandshakeMap.remove(nodeAddress);
// Notify listeners // Notify listeners
if (!nodeOfPreliminaryDataRequest.isPresent()) { if (!nodeOfPreliminaryDataRequest.isPresent()) {
if (peerManager.isSeedNode(nodeAddress)) if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable(); listener.onNoSeedNodeAvailable();
else else
listener.onNoPeersAvailable(); listener.onNoPeersAvailable();
}
} }
} }
}); });