Refactor WIP

This commit is contained in:
Manfred Karrer 2015-12-23 18:19:01 +01:00
parent 6d68cf8470
commit e3cbaeef7e
11 changed files with 125 additions and 139 deletions

View file

@ -71,6 +71,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private Address connectedSeedNode; private Address connectedSeedNode;
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
private boolean shutDownComplete; private boolean shutDownComplete;
@SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> readyForAuthentication; private MonadicBinding<Boolean> readyForAuthentication;
private final Storage<Address> dbStorage; private final Storage<Address> dbStorage;
private Address myOnionAddress; private Address myOnionAddress;

View file

@ -35,7 +35,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
protected final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>(); protected final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
protected ListeningExecutorService executorService; protected ListeningExecutorService executorService;
private Server server; private Server server;
private ConnectionListener startServerConnectionListener;
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
// accessed from different threads // accessed from different threads
@ -173,15 +172,11 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
final SettableFuture<Connection> resultFuture = SettableFuture.create(); final SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
UserThread.execute(() -> { UserThread.execute(() -> resultFuture.set(connection));
resultFuture.set(connection);
});
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> { UserThread.execute(() -> resultFuture.setException(throwable));
resultFuture.setException(throwable);
});
} }
}); });
return resultFuture; return resultFuture;
@ -305,7 +300,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
protected void startServer(ServerSocket serverSocket) { protected void startServer(ServerSocket serverSocket) {
Log.traceCall(); Log.traceCall();
startServerConnectionListener = new ConnectionListener() { ConnectionListener startServerConnectionListener = new ConnectionListener() {
@Override @Override
public void onConnection(Connection connection) { public void onConnection(Connection connection) {
Log.traceCall("startServerConnectionListener connection=" + connection); Log.traceCall("startServerConnectionListener connection=" + connection);

View file

@ -225,9 +225,7 @@ public class TorNetworkNode extends NetworkNode {
}); });
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() { Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) { public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
UserThread.execute(() -> { UserThread.execute(() -> resultHandler.accept(torNode));
resultHandler.accept(torNode);
});
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {

View file

@ -16,13 +16,11 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
// authentication protocol: // authentication protocol:
// node2 -> node1 AuthenticationRequest // node2 -> node1 AuthenticationRequest
// node1: close connection // node1: close connection
@ -40,9 +38,9 @@ public class AuthenticationHandshake implements MessageListener {
private final Address myAddress; private final Address myAddress;
private final Address peerAddress; private final Address peerAddress;
private SettableFuture<Connection> resultFuture; private final long startAuthTs;
private long startAuthTs; private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
private long nonce = 0; private long nonce;
private boolean stopped; private boolean stopped;
@ -57,22 +55,29 @@ public class AuthenticationHandshake implements MessageListener {
this.myAddress = myAddress; this.myAddress = myAddress;
this.peerAddress = peerAddress; this.peerAddress = peerAddress;
networkNode.addMessageListener(this);
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis(); startAuthTs = System.currentTimeMillis();
stopped = false;
nonce = 0;
networkNode.addMessageListener(this);
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation // MessageListener implementation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got onMessage called. That must not happen.");
return;
}
if (message instanceof AuthenticationMessage) { if (message instanceof AuthenticationMessage) {
// We are listening on all connections, so we need to filter out only our peer address // We are listening on all connections, so we need to filter out only our peer
if (((AuthenticationMessage) message).address.equals(peerAddress)) { if (((AuthenticationMessage) message).address.equals(peerAddress)) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
checkArgument(!stopped);
if (message instanceof AuthenticationResponse) { if (message instanceof AuthenticationResponse) {
// Requesting peer // Requesting peer
// We use the active connectionType if we started the authentication request to another peer // We use the active connectionType if we started the authentication request to another peer
@ -173,6 +178,11 @@ public class AuthenticationHandshake implements MessageListener {
Log.traceCall("peerAddress " + peerAddress); Log.traceCall("peerAddress " + peerAddress);
// Requesting peer // Requesting peer
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got requestAuthentication called. That must not happen.");
}
resultFutureOptional = Optional.of(SettableFuture.create());
AuthenticationRequest authenticationRequest = new AuthenticationRequest(myAddress, getAndSetNonce()); AuthenticationRequest authenticationRequest = new AuthenticationRequest(myAddress, getAndSetNonce());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationRequest); SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationRequest);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@ -193,7 +203,7 @@ public class AuthenticationHandshake implements MessageListener {
} }
}); });
return resultFuture; return resultFutureOptional.get();
} }
@ -206,6 +216,12 @@ public class AuthenticationHandshake implements MessageListener {
Log.traceCall("peerAddress " + peerAddress); Log.traceCall("peerAddress " + peerAddress);
// Responding peer // Responding peer
if (stopped) {
log.warn("AuthenticationHandshake already shut down but still got respondToAuthenticationRequest called. That must not happen.");
}
resultFutureOptional = Optional.of(SettableFuture.create());
log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress); log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " + log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress); "connection with his reported address.", peerAddress);
@ -223,7 +239,7 @@ public class AuthenticationHandshake implements MessageListener {
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse); SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
log.trace("onSuccess sending AuthenticationResponse"); log.trace("onSuccess sending AuthenticationResponse");
connection.setPeerAddress(peerAddress); connection.setPeerAddress(peerAddress);
@ -241,7 +257,7 @@ public class AuthenticationHandshake implements MessageListener {
} }
}, 200, TimeUnit.MILLISECONDS); }, 200, TimeUnit.MILLISECONDS);
}); });
return resultFuture; return resultFutureOptional.get();
} }
@ -262,7 +278,7 @@ public class AuthenticationHandshake implements MessageListener {
Log.traceCall(); Log.traceCall();
nonce = new Random().nextLong(); nonce = new Random().nextLong();
while (nonce == 0) while (nonce == 0)
nonce = getAndSetNonce(); nonce = new Random().nextLong();
return nonce; return nonce;
} }
@ -270,19 +286,25 @@ public class AuthenticationHandshake implements MessageListener {
private void failed(@NotNull Throwable throwable) { private void failed(@NotNull Throwable throwable) {
Log.traceCall(); Log.traceCall();
shutDown(); shutDown();
resultFuture.setException(throwable); if (resultFutureOptional.isPresent())
resultFutureOptional.get().setException(throwable);
else
log.warn("failed called but resultFuture = null. That must never happen.");
} }
private void completed(Connection connection) { private void completed(Connection connection) {
Log.traceCall(); Log.traceCall();
shutDown(); shutDown();
resultFuture.set(connection); if (resultFutureOptional.isPresent())
resultFutureOptional.get().set(connection);
else
log.warn("completed called but resultFuture = null. That must never happen.");
} }
private void shutDown() { private void shutDown() {
Log.traceCall(); Log.traceCall();
networkNode.removeMessageListener(this);
stopped = true; stopped = true;
networkNode.removeMessageListener(this);
} }
@Override @Override

View file

@ -26,8 +26,8 @@ public class MaintenanceManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class); private static final Logger log = LoggerFactory.getLogger(MaintenanceManager.class);
private Timer sendPingTimer; private Timer sendPingTimer;
private PeerGroup peerGroup; private final PeerGroup peerGroup;
private NetworkNode networkNode; private final NetworkNode networkNode;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -12,7 +12,7 @@ public class Peer {
public final Connection connection; public final Connection connection;
public final Address address; public final Address address;
private long pingNonce; private final long pingNonce;
public Peer(Connection connection) { public Peer(Connection connection) {
this.connection = connection; this.connection = connection;

View file

@ -27,18 +27,24 @@ import static com.google.common.base.Preconditions.checkArgument;
public class PeerGroup implements MessageListener, ConnectionListener { public class PeerGroup implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
static int simulateAuthTorNode = 0; private static int simulateAuthTorNode = 0;
public static void setSimulateAuthTorNode(int simulateAuthTorNode) { public static void setSimulateAuthTorNode(int simulateAuthTorNode) {
PeerGroup.simulateAuthTorNode = simulateAuthTorNode; PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
} }
static int MAX_CONNECTIONS_LOW_PRIO = 8; private static int MAX_CONNECTIONS_LOW_PRIORITY;
static int MAX_CONNECTIONS_NORMAL_PRIO = MAX_CONNECTIONS_LOW_PRIO + 4; private static int MAX_CONNECTIONS_NORMAL_PRIORITY;
static int MAX_CONNECTIONS_HIGH_PRIO = MAX_CONNECTIONS_NORMAL_PRIO + 4; private static int MAX_CONNECTIONS_HIGH_PRIORITY;
public static void setMaxConnectionsLowPrio(int maxConnectionsLowPrio) { public static void setMaxConnectionsLowPriority(int maxConnectionsLowPriority) {
MAX_CONNECTIONS_LOW_PRIO = maxConnectionsLowPrio; MAX_CONNECTIONS_LOW_PRIORITY = maxConnectionsLowPriority;
MAX_CONNECTIONS_NORMAL_PRIORITY = MAX_CONNECTIONS_LOW_PRIORITY + 4;
MAX_CONNECTIONS_HIGH_PRIORITY = MAX_CONNECTIONS_NORMAL_PRIORITY + 4;
}
static {
setMaxConnectionsLowPriority(8);
} }
static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000; static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000;
@ -106,7 +112,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (message instanceof AuthenticationRequest) if (message instanceof AuthenticationRequest)
processAuthenticationRequest((AuthenticationRequest) message, connection); processAuthenticationRequest((AuthenticationRequest) message, connection);
else if (message instanceof AuthenticationRejection) else if (message instanceof AuthenticationRejection)
processAuthenticationRejection((AuthenticationRejection) message, connection); processAuthenticationRejection((AuthenticationRejection) message);
} }
@ -120,8 +126,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("Broadcast message to {} peers. Message: {}", authenticatedPeers.values().size(), message); log.info("Broadcast message to {} peers. Message: {}", authenticatedPeers.values().size(), message);
authenticatedPeers.values().stream() authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender)) .filter(e -> !e.address.equals(sender))
.forEach(peer -> { .forEach(peer -> UserThread.runAfterRandomDelay(() -> {
UserThread.runAfterRandomDelay(() -> {
final Address address = peer.address; final Address address = peer.address;
log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); log.trace("Broadcast message from " + getMyAddress() + " to " + address + ".");
SettableFuture<Connection> future = networkNode.sendMessage(address, message); SettableFuture<Connection> future = networkNode.sendMessage(address, message);
@ -138,8 +143,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} }
}); });
}, },
10, 200, TimeUnit.MILLISECONDS); 10, 200, TimeUnit.MILLISECONDS));
});
} else { } else {
log.info("Message not broadcasted because we have no authenticated peers yet. " + log.info("Message not broadcasted because we have no authenticated peers yet. " +
"message = {}", message); "message = {}", message);
@ -158,7 +162,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Process incoming authentication request // Process incoming authentication messages
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) { private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) {
@ -201,7 +205,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} }
} }
private void processAuthenticationRejection(AuthenticationRejection message, final Connection connection) { private void processAuthenticationRejection(AuthenticationRejection message) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
Address peerAddress = message.address; Address peerAddress = message.address;
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress)); cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress));
@ -212,15 +216,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection); SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
if (connection != null) {
checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress does not match connection.getPeerAddress()"); checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress does not match connection.getPeerAddress()");
log.info("We got the peer who did an authentication request authenticated."); log.info("We got the peer who did an authentication request authenticated.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
} else {
log.error("Connection is null. That must not happen.");
removePeer(peerAddress);
}
} }
@Override @Override
@ -229,7 +228,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"That can happen if the peer went offline. " + throwable.getMessage()); "That can happen if the peer went offline. " + throwable.getMessage());
removePeer(peerAddress); removePeer(peerAddress);
} }
}); }
);
} }
private void cancelOwnAuthenticationRequest(Address peerAddress, AuthenticationHandshake authenticationHandshake) { private void cancelOwnAuthenticationRequest(Address peerAddress, AuthenticationHandshake authenticationHandshake) {
@ -264,24 +265,16 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We try to authenticate to seed node {}.", peerAddress); log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
if (connection != null) {
log.info("We got our first seed node authenticated. " + log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate."); "We try if there are reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} else {
log.warn("Connection is null. That should never happen. " + peerAddress);
removePeer(peerAddress);
log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes));
}
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed." + log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if seed nodes are offline." + "\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage()); "\nException:" + throwable.getMessage());
@ -310,24 +303,16 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We try to authenticate to seed node {}.", peerAddress); log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
if (connection != null) {
log.info("We got a seed node authenticated. " + log.info("We got a seed node authenticated. " +
"We try if there are more seed nodes available to authenticate."); "We try if there are more seed nodes available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingSeedNode(); authenticateToRemainingSeedNode();
} else {
log.warn("Connection is null. That should never happen. " + peerAddress);
removePeer(peerAddress);
log.info("We try another random seed node for authentication.");
authenticateToRemainingSeedNode();
}
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed." + log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if the seed node is offline." + "\nThat is expected if the seed node is offline." +
"\nException:" + throwable.getMessage()); "\nException:" + throwable.getMessage());
@ -367,24 +352,16 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We try to authenticate to peer {}.", peerAddress); log.info("We try to authenticate to peer {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
if (connection != null) {
log.info("We got a peer authenticated. " + log.info("We got a peer authenticated. " +
"We try if there are more reported peers available to authenticate."); "We try if there are more reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} else {
log.warn("Connection is null. That should never happen. " + peerAddress);
removePeer(peerAddress);
log.info("We try another random seed node for authentication.");
authenticateToRemainingReportedPeer();
}
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed." + log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if the peer is offline." + "\nThat is expected if the peer is offline." +
"\nException:" + throwable.getMessage()); "\nException:" + throwable.getMessage());
@ -425,23 +402,16 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We try to authenticate to peer {} for sending a private message.", peerAddress); log.info("We try to authenticate to peer {} for sending a private message.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(Connection connection) {
if (connection != null) {
log.info("We got a new peer for sending a private message authenticated."); log.info("We got a new peer for sending a private message authenticated.");
addAuthenticatedPeer(connection, peerAddress); addAuthenticatedPeer(connection, peerAddress);
if (completeHandler != null) if (completeHandler != null)
completeHandler.run(); completeHandler.run();
} else {
log.error("Connection is null. That should never happen. " + peerAddress);
removePeer(peerAddress);
if (faultHandler != null)
faultHandler.run();
}
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.error("Authentication to " + peerAddress + " for sending a private message failed." + log.error("Authentication to " + peerAddress + " for sending a private message failed." +
"\nSeems that the peer is offline." + "\nSeems that the peer is offline." +
"\nException:" + throwable.getMessage()); "\nException:" + throwable.getMessage());
@ -512,7 +482,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} }
private boolean maxConnectionsForAuthReached() { private boolean maxConnectionsForAuthReached() {
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIO; return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
} }
private boolean remainingSeedNodesAvailable() { private boolean remainingSeedNodesAvailable() {
@ -526,7 +496,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private boolean checkIfConnectedPeersExceeds() { private boolean checkIfConnectedPeersExceeds() {
Log.traceCall(); Log.traceCall();
int size = authenticatedPeers.size(); int size = authenticatedPeers.size();
if (size > PeerGroup.MAX_CONNECTIONS_LOW_PRIO) { if (size > PeerGroup.MAX_CONNECTIONS_LOW_PRIORITY) {
Set<Connection> allConnections = networkNode.getAllConnections(); Set<Connection> allConnections = networkNode.getAllConnections();
int allConnectionsSize = allConnections.size(); int allConnectionsSize = allConnections.size();
log.info("We have {} connections open. Lets remove the passive connections" + log.info("We have {} connections open. Lets remove the passive connections" +
@ -541,8 +511,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (authenticatedConnections.size() == 0) { if (authenticatedConnections.size() == 0) {
log.debug("There are no passive connections for closing. We check if we are exceeding " + log.debug("There are no passive connections for closing. We check if we are exceeding " +
"MAX_CONNECTIONS_NORMAL ({}) ", PeerGroup.MAX_CONNECTIONS_NORMAL_PRIO); "MAX_CONNECTIONS_NORMAL ({}) ", PeerGroup.MAX_CONNECTIONS_NORMAL_PRIORITY);
if (size > PeerGroup.MAX_CONNECTIONS_NORMAL_PRIO) { if (size > PeerGroup.MAX_CONNECTIONS_NORMAL_PRIORITY) {
authenticatedConnections = allConnections.stream() authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE) .filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE)
@ -550,8 +520,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (authenticatedConnections.size() == 0) { if (authenticatedConnections.size() == 0) {
log.debug("There are no passive or active connections for closing. We check if we are exceeding " + log.debug("There are no passive or active connections for closing. We check if we are exceeding " +
"MAX_CONNECTIONS_HIGH ({}) ", PeerGroup.MAX_CONNECTIONS_HIGH_PRIO); "MAX_CONNECTIONS_HIGH ({}) ", PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY);
if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIO) { if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY) {
authenticatedConnections = allConnections.stream() authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -604,7 +574,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// we disconnect misbehaving nodes trying to send too many peers // we disconnect misbehaving nodes trying to send too many peers
// reported peers include the authenticated peers which is normally max. 8 but we give some headroom // reported peers include the authenticated peers which is normally max. 8 but we give some headroom
// for safety // for safety
if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + MAX_CONNECTIONS_LOW_PRIO * 3)) { if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + MAX_CONNECTIONS_LOW_PRIORITY * 3)) {
connection.shutDown(); connection.shutDown();
} else { } else {
// In case we have one of the peers already we adjust the lastActivityDate by adjusting the date to the mid // In case we have one of the peers already we adjust the lastActivityDate by adjusting the date to the mid
@ -681,7 +651,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
printReportedPeers(); printReportedPeers();
} }
public void printAuthenticatedPeers() { private void printAuthenticatedPeers() {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Authenticated peers for node " + getMyAddress() + ":"); "Authenticated peers for node " + getMyAddress() + ":");
authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address));
@ -689,7 +659,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info(result.toString()); log.info(result.toString());
} }
public void printReportedPeers() { private void printReportedPeers() {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Reported peers for node " + getMyAddress() + ":"); "Reported peers for node " + getMyAddress() + ":");
reportedPeers.stream().forEach(e -> result.append("\n").append(e)); reportedPeers.stream().forEach(e -> result.append("\n").append(e));

View file

@ -63,10 +63,10 @@ public class SeedNode {
String arg2 = args[2]; String arg2 = args[2];
int maxConnections = Integer.parseInt(arg2); int maxConnections = Integer.parseInt(arg2);
checkArgument(maxConnections < 1000, "maxConnections seems to be a bit too high..."); checkArgument(maxConnections < 1000, "maxConnections seems to be a bit too high...");
PeerGroup.setMaxConnectionsLowPrio(maxConnections); PeerGroup.setMaxConnectionsLowPriority(maxConnections);
} else { } else {
// we keep default a higher connection size for seed nodes // we keep default a higher connection size for seed nodes
PeerGroup.setMaxConnectionsLowPrio(50); PeerGroup.setMaxConnectionsLowPriority(50);
} }
if (args.length > 3) { if (args.length > 3) {
String arg3 = args[3]; String arg3 = args[3];

View file

@ -59,7 +59,7 @@ public class P2PServiceTest {
LocalhostNetworkNode.setSimulateTorDelayTorNode(10); LocalhostNetworkNode.setSimulateTorDelayTorNode(10);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(100); LocalhostNetworkNode.setSimulateTorDelayHiddenService(100);
PeerGroup.setMaxConnectionsLowPrio(8); PeerGroup.setMaxConnectionsLowPriority(8);
keyRing1 = new KeyRing(new KeyStorage(dir1)); keyRing1 = new KeyRing(new KeyStorage(dir1));
keyRing2 = new KeyRing(new KeyStorage(dir2)); keyRing2 = new KeyRing(new KeyStorage(dir2));

View file

@ -33,7 +33,7 @@ public class PeerGroupTest {
public void setup() throws InterruptedException { public void setup() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(50); LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8); LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
PeerGroup.setMaxConnectionsLowPrio(100); PeerGroup.setMaxConnectionsLowPriority(100);
seedNodes = new HashSet<>(); seedNodes = new HashSet<>();
if (useLocalhost) { if (useLocalhost) {