Add maintenance task

This commit is contained in:
Manfred Karrer 2016-01-27 03:38:16 +01:00
parent 579c3797bc
commit c68c06d598
3 changed files with 31 additions and 22 deletions

View file

@ -31,7 +31,7 @@ public final class Arbitrator implements PubKeyProtectedExpirablePayload {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public static final long TTL = 6 * 24 * 60 * 60 * 1000; // 6 days public static final long TTL = 10 * 24 * 60 * 60 * 1000; // 10 days
// Persisted fields // Persisted fields
private final byte[] btcPubKey; private final byte[] btcPubKey;

View file

@ -9,10 +9,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities; import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest; import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse; import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage; import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage;
@ -35,7 +32,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager; private final PeerManager peerManager;
private final Set<NodeAddress> seedNodeAddresses; private final Set<NodeAddress> seedNodeAddresses;
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
private Timer continueWithMorePeersTimer, timeoutTimer, checkConnectionsTimer; private Timer continueWithMorePeersTimer, timeoutTimer, maintainConnectionsTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -56,7 +53,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopContinueWithMorePeersTimer(); stopContinueWithMorePeersTimer();
stopCheckConnectionsTimer(); stopMaintainConnectionsTimer();
stopTimeoutTimer(); stopTimeoutTimer();
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
} }
@ -70,7 +67,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses)); requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses));
long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min. long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
executor.scheduleAtFixedRate(() -> UserThread.execute(this::checkConnections), executor.scheduleAtFixedRate(() -> UserThread.execute(this::maintainConnections),
delay, delay, TimeUnit.SECONDS); delay, delay, TimeUnit.SECONDS);
} }
@ -87,8 +84,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
// We use a timer to throttle if we get a series of disconnects // We use a timer to throttle if we get a series of disconnects
// The more connections we have the more relaxed we are with a checkConnections // The more connections we have the more relaxed we are with a checkConnections
if (checkConnectionsTimer == null) if (maintainConnectionsTimer == null)
checkConnectionsTimer = UserThread.runAfter(this::checkConnections, maintainConnectionsTimer = UserThread.runAfter(this::maintainConnections,
networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS); networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS);
@ -241,20 +238,32 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
// we check if we have at least one seed node connected // we check if we have at least one seed node connected
private void checkConnections() { private void maintainConnections() {
Log.traceCall(); Log.traceCall();
stopCheckConnectionsTimer();
stopMaintainConnectionsTimer();
// we want at least 1 seed node connected
Set<Connection> allConnections = networkNode.getConfirmedConnections(); Set<Connection> allConnections = networkNode.getConfirmedConnections();
List<Connection> connectedSeedNodes = allConnections.stream() List<Connection> connectedSeedNodes = allConnections.stream()
.filter(peerManager::isSeedNode) .filter(peerManager::isSeedNode)
.collect(Collectors.toList()); .collect(Collectors.toList());
log.debug("connectedSeedNodes " + connectedSeedNodes);
if (connectedSeedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) if (connectedSeedNodes.size() == 0 && !seedNodeAddresses.isEmpty())
requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses)); requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses));
// We try to get sufficient connections by using reported and persisted peers
if (continueWithMorePeersTimer == null) if (continueWithMorePeersTimer == null)
continueWithMorePeersTimer = UserThread.runAfterRandomDelay(this::continueWithMorePeers, 10, 20); continueWithMorePeersTimer = UserThread.runAfterRandomDelay(this::continueWithMorePeers, 10, 20);
// Use all outbound connections for updating reported peers and make sure we keep the connection alive
// Inbound connections should be maintained be the requesting peer
networkNode.getConfirmedConnections().stream()
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
c instanceof OutboundConnection).
forEach(c -> UserThread.runAfterRandomDelay(() ->
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>())
, 3, 5));
} }
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) { private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
@ -273,10 +282,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
} }
private void stopCheckConnectionsTimer() { private void stopMaintainConnectionsTimer() {
if (checkConnectionsTimer != null) { if (maintainConnectionsTimer != null) {
checkConnectionsTimer.cancel(); maintainConnectionsTimer.cancel();
checkConnectionsTimer = null; maintainConnectionsTimer = null;
} }
} }

View file

@ -33,13 +33,13 @@ public class PeerManager implements ConnectionListener, MessageListener {
public static void setMaxConnections(int maxConnections) { public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections; MAX_CONNECTIONS = maxConnections;
MIN_CONNECTIONS = maxConnections - 2; MIN_CONNECTIONS = maxConnections - 4;
MAX_CONNECTIONS_EXTENDED_1 = MAX_CONNECTIONS + 5; MAX_CONNECTIONS_EXTENDED_1 = MAX_CONNECTIONS + 6;
MAX_CONNECTIONS_EXTENDED_2 = MAX_CONNECTIONS_EXTENDED_1 + 5; MAX_CONNECTIONS_EXTENDED_2 = MAX_CONNECTIONS_EXTENDED_1 + 6;
} }
static { static {
setMaxConnections(10); setMaxConnections(12);
} }
private static final int MAX_REPORTED_PEERS = 1000; private static final int MAX_REPORTED_PEERS = 1000;