Merge pull request #434 from ivilata/PeerManagerInstanceMaxConn

Per-instance max connections in PeerManager
This commit is contained in:
Manfred Karrer 2016-05-14 12:10:59 +02:00
commit c745bb37a7
6 changed files with 78 additions and 54 deletions

View file

@ -55,9 +55,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener, public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener,
HashMapChangedListener { HashMapChangedListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class); private static final Logger log = LoggerFactory.getLogger(P2PService.class);
public static final int MAX_CONNECTIONS_DEFAULT = 12;
private final SeedNodesRepository seedNodesRepository; private final SeedNodesRepository seedNodesRepository;
private final int port; private final int port;
private final int maxConnections;
private final File torDir; private final File torDir;
private Clock clock; private Clock clock;
private final Optional<EncryptionService> optionalEncryptionService; private final Optional<EncryptionService> optionalEncryptionService;
@ -104,8 +106,32 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Clock clock, Clock clock,
@Nullable EncryptionService encryptionService, @Nullable EncryptionService encryptionService,
@Nullable KeyRing keyRing) { @Nullable KeyRing keyRing) {
this(
seedNodesRepository,
port, MAX_CONNECTIONS_DEFAULT,
torDir,
useLocalhost,
networkId,
storageDir,
clock,
encryptionService,
keyRing
);
}
@VisibleForTesting
public P2PService(SeedNodesRepository seedNodesRepository,
int port, int maxConnections,
File torDir,
boolean useLocalhost,
int networkId,
File storageDir,
Clock clock,
@Nullable EncryptionService encryptionService,
@Nullable KeyRing keyRing) {
this.seedNodesRepository = seedNodesRepository; this.seedNodesRepository = seedNodesRepository;
this.port = port; this.port = port;
this.maxConnections = maxConnections;
this.torDir = torDir; this.torDir = torDir;
this.clock = clock; this.clock = clock;
@ -124,7 +150,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.addMessageListener(this); networkNode.addMessageListener(this);
Set<NodeAddress> seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId); Set<NodeAddress> seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId);
peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir, clock); peerManager = new PeerManager(networkNode, maxConnections, seedNodeAddresses, storageDir, clock);
broadcaster = new Broadcaster(networkNode, peerManager); broadcaster = new Broadcaster(networkNode, peerManager);

View file

@ -28,32 +28,28 @@ public class PeerManager implements ConnectionListener {
// Use a long delay as the bootstrapping peer might need a while until it knows its onion address // Use a long delay as the bootstrapping peer might need a while until it knows its onion address
private static final long REMOVE_ANONYMOUS_PEER_SEC = Timer.STRESS_TEST ? 10 : 120; private static final long REMOVE_ANONYMOUS_PEER_SEC = Timer.STRESS_TEST ? 10 : 120;
private static int MAX_CONNECTIONS;
private static int MIN_CONNECTIONS;
private static int MAX_CONNECTIONS_PEER;
private static int MAX_CONNECTIONS_NON_DIRECT;
private static int MAX_CONNECTIONS_ABSOLUTE;
private final boolean printReportedPeersDetails = true;
private boolean lostAllConnections;
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
MIN_CONNECTIONS = Math.max(1, maxConnections - 4);
MAX_CONNECTIONS_PEER = MAX_CONNECTIONS + 4;
MAX_CONNECTIONS_NON_DIRECT = MAX_CONNECTIONS + 8;
MAX_CONNECTIONS_ABSOLUTE = MAX_CONNECTIONS + 18;
}
static {
setMaxConnections(12);
}
private static final int MAX_REPORTED_PEERS = 1000; private static final int MAX_REPORTED_PEERS = 1000;
private static final int MAX_PERSISTED_PEERS = 500; private static final int MAX_PERSISTED_PEERS = 500;
private static final long MAX_AGE = TimeUnit.DAYS.toMillis(14); // max age for reported peers is 14 days private static final long MAX_AGE = TimeUnit.DAYS.toMillis(14); // max age for reported peers is 14 days
private final boolean printReportedPeersDetails = true;
private boolean lostAllConnections;
private int maxConnections;
private int minConnections;
private int maxConnectionsPeer;
private int maxConnectionsNonDirect;
private int maxConnectionsAbsolute;
// Modify this to change the relationships between connection limits.
private void setConnectionLimits(int maxConnections) {
this.maxConnections = maxConnections;
minConnections = Math.max(1, maxConnections - 4);
maxConnectionsPeer = maxConnections + 4;
maxConnectionsNonDirect = maxConnections + 8;
maxConnectionsAbsolute = maxConnections + 18;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Listener // Listener
@ -90,7 +86,9 @@ public class PeerManager implements ConnectionListener {
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public PeerManager(NetworkNode networkNode, Set<NodeAddress> seedNodeAddresses, File storageDir, Clock clock) { public PeerManager(NetworkNode networkNode, int maxConnections, Set<NodeAddress> seedNodeAddresses,
File storageDir, Clock clock) {
setConnectionLimits(maxConnections);
this.networkNode = networkNode; this.networkNode = networkNode;
this.clock = clock; this.clock = clock;
// seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes) // seedNodeAddresses can be empty (in case there is only 1 seed node, the seed node starting up has no other seed nodes)
@ -139,7 +137,7 @@ public class PeerManager implements ConnectionListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public int getMaxConnections() { public int getMaxConnections() {
return MAX_CONNECTIONS_ABSOLUTE; return maxConnectionsAbsolute;
} }
public void addListener(Listener listener) { public void addListener(Listener listener) {
@ -199,7 +197,7 @@ public class PeerManager implements ConnectionListener {
removeSuperfluousSeedNodes(); removeSuperfluousSeedNodes();
removeTooOldReportedPeers(); removeTooOldReportedPeers();
removeTooOldPersistedPeers(); removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS); checkMaxConnections(maxConnections);
} else { } else {
log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call."); log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call.");
} }
@ -223,8 +221,8 @@ public class PeerManager implements ConnectionListener {
if (candidates.size() == 0) { if (candidates.size() == 0) {
log.info("No candidates found. We check if we exceed our " + log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_PEER limit of {}", MAX_CONNECTIONS_PEER); "maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size > MAX_CONNECTIONS_PEER) { if (size > maxConnectionsPeer) {
log.info("Lets try to remove ANY connection of type PEER."); log.info("Lets try to remove ANY connection of type PEER.");
candidates = allConnections.stream() candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER) .filter(e -> e.getPeerType() == Connection.PeerType.PEER)
@ -232,8 +230,8 @@ public class PeerManager implements ConnectionListener {
if (candidates.size() == 0) { if (candidates.size() == 0) {
log.info("No candidates found. We check if we exceed our " + log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_NON_DIRECT limit of {}", MAX_CONNECTIONS_NON_DIRECT); "maxConnectionsNonDirect limit of {}", maxConnectionsNonDirect);
if (size > MAX_CONNECTIONS_NON_DIRECT) { if (size > maxConnectionsNonDirect) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER."); log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER.");
candidates = allConnections.stream() candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) .filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
@ -241,8 +239,8 @@ public class PeerManager implements ConnectionListener {
if (candidates.size() == 0) { if (candidates.size() == 0) {
log.info("No candidates found. We check if we exceed our " + log.info("No candidates found. We check if we exceed our " +
"MAX_CONNECTIONS_ABSOLUTE limit of {}", MAX_CONNECTIONS_ABSOLUTE); "maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size > MAX_CONNECTIONS_ABSOLUTE) { if (size > maxConnectionsAbsolute) {
log.info("Lets try to remove any connection."); log.info("Lets try to remove any connection.");
candidates = allConnections.stream().collect(Collectors.toList()); candidates = allConnections.stream().collect(Collectors.toList());
} }
@ -288,7 +286,7 @@ public class PeerManager implements ConnectionListener {
private void removeSuperfluousSeedNodes() { private void removeSuperfluousSeedNodes() {
Log.traceCall(); Log.traceCall();
if (networkNode.getConfirmedConnections().size() > MAX_CONNECTIONS) { if (networkNode.getConfirmedConnections().size() > maxConnections) {
Set<Connection> connections = networkNode.getConfirmedConnections(); Set<Connection> connections = networkNode.getConfirmedConnections();
if (hasSufficientConnections()) { if (hasSufficientConnections()) {
List<Connection> candidates = connections.stream() List<Connection> candidates = connections.stream()
@ -346,7 +344,7 @@ public class PeerManager implements ConnectionListener {
printNewReportedPeers(reportedPeersToAdd); printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules // We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + PeerManager.MAX_CONNECTIONS_ABSOLUTE + 10)) { if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(reportedPeersToAdd); reportedPeers.addAll(reportedPeersToAdd);
purgeReportedPeersIfExceeds(); purgeReportedPeersIfExceeds();
@ -367,7 +365,7 @@ public class PeerManager implements ConnectionListener {
private void purgeReportedPeersIfExceeds() { private void purgeReportedPeersIfExceeds() {
Log.traceCall(); Log.traceCall();
int size = reportedPeers.size(); int size = reportedPeers.size();
int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE; int limit = MAX_REPORTED_PEERS - maxConnectionsAbsolute;
if (size > limit) { if (size > limit) {
log.trace("We have already {} reported peers which exceeds our limit of {}." + log.trace("We have already {} reported peers which exceeds our limit of {}." +
"We remove random peers from the reported peers list.", size, limit); "We remove random peers from the reported peers list.", size, limit);
@ -470,7 +468,7 @@ public class PeerManager implements ConnectionListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() { public boolean hasSufficientConnections() {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; return networkNode.getNodeAddressesOfConfirmedConnections().size() >= minConnections;
} }
public boolean isSeedNode(Peer reportedPeer) { public boolean isSeedNode(Peer reportedPeer) {

View file

@ -30,6 +30,7 @@ public class SeedNode {
public static final int MAX_CONNECTIONS_DEFAULT = 50; public static final int MAX_CONNECTIONS_DEFAULT = 50;
private NodeAddress mySeedNodeAddress = new NodeAddress("localhost:8001"); private NodeAddress mySeedNodeAddress = new NodeAddress("localhost:8001");
private int maxConnections = MAX_CONNECTIONS_DEFAULT; // we keep default a higher connection size for seed nodes
private boolean useLocalhost = false; private boolean useLocalhost = false;
private Set<NodeAddress> progArgSeedNodes; private Set<NodeAddress> progArgSeedNodes;
private P2PService seedNodeP2PService; private P2PService seedNodeP2PService;
@ -67,13 +68,9 @@ public class SeedNode {
Version.setBtcNetworkId(networkId); Version.setBtcNetworkId(networkId);
if (args.length > 2) { if (args.length > 2) {
String arg2 = args[2]; String arg2 = args[2];
int maxConnections = Integer.parseInt(arg2); maxConnections = Integer.parseInt(arg2);
log.info("From processArgs: maxConnections=" + maxConnections); log.info("From processArgs: maxConnections=" + maxConnections);
checkArgument(maxConnections < MAX_CONNECTIONS_LIMIT, "maxConnections seems to be a bit too high..."); checkArgument(maxConnections < MAX_CONNECTIONS_LIMIT, "maxConnections seems to be a bit too high...");
PeerManager.setMaxConnections(maxConnections);
} else {
// we keep default a higher connection size for seed nodes
PeerManager.setMaxConnections(MAX_CONNECTIONS_DEFAULT);
} }
if (args.length > 3) { if (args.length > 3) {
String arg3 = args[3]; String arg3 = args[3];
@ -107,11 +104,13 @@ public class SeedNode {
} }
public void createAndStartP2PService(boolean useDetailedLogging) { public void createAndStartP2PService(boolean useDetailedLogging) {
createAndStartP2PService(mySeedNodeAddress, useLocalhost, Version.getBtcNetworkId(), useDetailedLogging, progArgSeedNodes, null); createAndStartP2PService(mySeedNodeAddress, maxConnections, useLocalhost,
Version.getBtcNetworkId(), useDetailedLogging, progArgSeedNodes, null);
} }
@VisibleForTesting @VisibleForTesting
public void createAndStartP2PService(NodeAddress mySeedNodeAddress, public void createAndStartP2PService(NodeAddress mySeedNodeAddress,
int maxConnections,
boolean useLocalhost, boolean useLocalhost,
int networkId, int networkId,
boolean useDetailedLogging, boolean useDetailedLogging,
@ -144,7 +143,8 @@ public class SeedNode {
log.info("Created torDir at " + torDir.getAbsolutePath()); log.info("Created torDir at " + torDir.getAbsolutePath());
seedNodesRepository.setNodeAddressToExclude(mySeedNodeAddress); seedNodesRepository.setNodeAddressToExclude(mySeedNodeAddress);
seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, new Clock(), null, null); seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, maxConnections,
torDir, useLocalhost, networkId, storageDir, new Clock(), null, null);
seedNodeP2PService.start(listener); seedNodeP2PService.start(listener);
} }

View file

@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
// Run it once then lookup for onion address at: tor/hiddenservice/hostname and use that for the NodeAddress param. // Run it once then lookup for onion address at: tor/hiddenservice/hostname and use that for the NodeAddress param.
public class PeerServiceTest { public class PeerServiceTest {
private static final Logger log = LoggerFactory.getLogger(PeerServiceTest.class); private static final Logger log = LoggerFactory.getLogger(PeerServiceTest.class);
private static final int MAX_CONNECTIONS = 100;
final boolean useLocalhost = true; final boolean useLocalhost = true;
private CountDownLatch latch; private CountDownLatch latch;
@ -37,7 +38,6 @@ public class PeerServiceTest {
public void setup() throws InterruptedException { public void setup() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(50); LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8); LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
PeerManager.setMaxConnections(100);
if (useLocalhost) { if (useLocalhost) {
seedNodeAddresses.add(new NodeAddress("localhost:8001")); seedNodeAddresses.add(new NodeAddress("localhost:8001"));
@ -127,7 +127,7 @@ public class PeerServiceTest {
/* latch = new CountDownLatch(2); /* latch = new CountDownLatch(2);
seedNode.createAndStartP2PService(nodeAddress, useLocalhost, 2, true, seedNode.createAndStartP2PService(nodeAddress, MAX_CONNECTIONS, useLocalhost, 2, true,
seedNodeAddresses, new P2PServiceListener() { seedNodeAddresses, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
@ -181,7 +181,7 @@ public class PeerServiceTest {
latch = new CountDownLatch(6); latch = new CountDownLatch(6);
seedNode1 = new SeedNode("test_dummy_dir"); seedNode1 = new SeedNode("test_dummy_dir");
seedNode1.createAndStartP2PService(nodeAddress1, useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() { seedNode1.createAndStartP2PService(nodeAddress1, MAX_CONNECTIONS, useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();
@ -219,7 +219,7 @@ public class PeerServiceTest {
Thread.sleep(500); Thread.sleep(500);
seedNode2 = new SeedNode("test_dummy_dir"); seedNode2 = new SeedNode("test_dummy_dir");
seedNode2.createAndStartP2PService(nodeAddress2, useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() { seedNode2.createAndStartP2PService(nodeAddress2, MAX_CONNECTIONS, useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();
@ -456,7 +456,7 @@ public class PeerServiceTest {
SeedNode seedNode = new SeedNode("test_dummy_dir"); SeedNode seedNode = new SeedNode("test_dummy_dir");
latch = new CountDownLatch(1); latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new NodeAddress("localhost", port), useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() { seedNode.createAndStartP2PService(new NodeAddress("localhost", port), MAX_CONNECTIONS, useLocalhost, 2, true, seedNodeAddresses, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();

View file

@ -82,7 +82,7 @@ public class TestUtils {
} }
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new NodeAddress("localhost", port), useLocalhost, 2, true, seedNode.createAndStartP2PService(new NodeAddress("localhost", port), SeedNode.MAX_CONNECTIONS_DEFAULT, useLocalhost, 2, true,
seedNodes, new P2PServiceListener() { seedNodes, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {

View file

@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
@Ignore @Ignore
public class PeerManagerTest { public class PeerManagerTest {
private static final Logger log = LoggerFactory.getLogger(PeerManagerTest.class); private static final Logger log = LoggerFactory.getLogger(PeerManagerTest.class);
private static final int MAX_CONNECTIONS = 100;
final boolean useLocalhost = true; final boolean useLocalhost = true;
private CountDownLatch latch; private CountDownLatch latch;
@ -37,7 +38,6 @@ public class PeerManagerTest {
public void setup() throws InterruptedException { public void setup() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(50); LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8); LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
PeerManager.setMaxConnections(100);
seedNodes = new HashSet<>(); seedNodes = new HashSet<>();
if (useLocalhost) { if (useLocalhost) {
@ -84,7 +84,7 @@ public class PeerManagerTest {
seedNodes.add(nodeAddress); seedNodes.add(nodeAddress);
seedNode1 = new SeedNode("test_dummy_dir"); seedNode1 = new SeedNode("test_dummy_dir");
latch = new CountDownLatch(2); latch = new CountDownLatch(2);
seedNode1.createAndStartP2PService(nodeAddress, useLocalhost, 2, true, seedNode1.createAndStartP2PService(nodeAddress, MAX_CONNECTIONS, useLocalhost, 2, true,
seedNodes, new P2PServiceListener() { seedNodes, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
@ -136,7 +136,7 @@ public class PeerManagerTest {
latch = new CountDownLatch(6); latch = new CountDownLatch(6);
seedNode1 = new SeedNode("test_dummy_dir"); seedNode1 = new SeedNode("test_dummy_dir");
seedNode1.createAndStartP2PService(nodeAddress1, useLocalhost, 2, true, seedNodes, new P2PServiceListener() { seedNode1.createAndStartP2PService(nodeAddress1, MAX_CONNECTIONS, useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();
@ -174,7 +174,7 @@ public class PeerManagerTest {
Thread.sleep(500); Thread.sleep(500);
seedNode2 = new SeedNode("test_dummy_dir"); seedNode2 = new SeedNode("test_dummy_dir");
seedNode2.createAndStartP2PService(nodeAddress2, useLocalhost, 2, true, seedNodes, new P2PServiceListener() { seedNode2.createAndStartP2PService(nodeAddress2, MAX_CONNECTIONS, useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();
@ -411,7 +411,7 @@ public class PeerManagerTest {
SeedNode seedNode = new SeedNode("test_dummy_dir"); SeedNode seedNode = new SeedNode("test_dummy_dir");
latch = new CountDownLatch(1); latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(new NodeAddress("localhost", port), useLocalhost, 2, true, seedNodes, new P2PServiceListener() { seedNode.createAndStartP2PService(new NodeAddress("localhost", port), MAX_CONNECTIONS, useLocalhost, 2, true, seedNodes, new P2PServiceListener() {
@Override @Override
public void onRequestingDataCompleted() { public void onRequestingDataCompleted() {
latch.countDown(); latch.countDown();