P2P network improvements

This commit is contained in:
Manfred Karrer 2016-02-25 14:48:00 +01:00
parent 900f89ce37
commit f0d727e345
23 changed files with 218 additions and 257 deletions

View File

@ -1,69 +0,0 @@
package io.bitsquare.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Random;
import java.util.TimerTask;
public class DefaultJavaTimer implements Timer {
private final Logger log = LoggerFactory.getLogger(DefaultJavaTimer.class);
private java.util.Timer timer;
public DefaultJavaTimer() {
}
@Override
public Timer runLater(Duration delay, Runnable runnable) {
if (timer == null) {
timer = new java.util.Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
try {
UserThread.execute(runnable::run);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing timerTask failed. " + t.getMessage());
}
}
}, delay.toMillis());
} else {
log.warn("runLater called on an already running timer.");
}
return this;
}
@Override
public Timer runPeriodically(java.time.Duration interval, Runnable runnable) {
if (timer == null) {
timer = new java.util.Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
try {
UserThread.execute(runnable::run);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing timerTask failed. " + t.getMessage());
}
}
}, interval.toMillis(), interval.toMillis());
} else {
log.warn("runLater called on an already running timer.");
}
return this;
}
@Override
public void stop() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
}

View File

@ -3,6 +3,8 @@ package io.bitsquare.common;
import java.time.Duration;
public interface Timer {
boolean STRESS_TEST = false;
Timer runLater(java.time.Duration delay, Runnable action);
Timer runPeriodically(Duration interval, Runnable runnable);

View File

@ -46,7 +46,7 @@ public class UserThread {
static {
// If not defined we use same thread as caller thread
executor = MoreExecutors.directExecutor();
timerClass = DefaultJavaTimer.class;
timerClass = FrameRateTimer.class;
}
private static Executor executor;

View File

@ -60,8 +60,8 @@ public class Utilities {
public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTime));
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec));
}
public static ThreadPoolExecutor getThreadPoolExecutor(String name,

View File

@ -60,10 +60,10 @@ import static io.bitsquare.util.Validator.nonEmptyStringOf;
public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener {
private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class);
private static final long RETRY_REPUBLISH_DELAY_SEC = 5;
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10;
private static final long REPUBLISH_INTERVAL_MILLIS = 10 * Offer.TTL;
private static final long REFRESH_INTERVAL_MILLIS = (long) (Offer.TTL * 0.5);
private static final long RETRY_REPUBLISH_DELAY_SEC = Timer.STRESS_TEST ? 1 : 5;
private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = Timer.STRESS_TEST ? 1 : 10;
private static final long REPUBLISH_INTERVAL_MS = Timer.STRESS_TEST ? 3000 : 10 * Offer.TTL;
private static final long REFRESH_INTERVAL_MS = Timer.STRESS_TEST ? 1000 : (long) (Offer.TTL * 0.5);
private final KeyRing keyRing;
private final User user;
@ -404,7 +404,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
log.warn("We have stopped already. We ignore that periodicRepublishOffersTimer.run call.");
}
},
REPUBLISH_INTERVAL_MILLIS,
REPUBLISH_INTERVAL_MS,
TimeUnit.MILLISECONDS);
else
log.trace("periodicRepublishOffersTimer already stated");
@ -425,7 +425,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call.");
}
},
REFRESH_INTERVAL_MILLIS,
REFRESH_INTERVAL_MS,
TimeUnit.MILLISECONDS);
else
log.trace("periodicRefreshOffersTimer already stated");

View File

@ -30,8 +30,8 @@ import org.fxmisc.easybind.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkStatisticListItem {
private static final Logger log = LoggerFactory.getLogger(NetworkStatisticListItem.class);
public class P2pNetworkListItem {
private static final Logger log = LoggerFactory.getLogger(P2pNetworkListItem.class);
private final Statistic statistic;
private final Connection connection;
@ -42,9 +42,12 @@ public class NetworkStatisticListItem {
private final StringProperty lastActivity = new SimpleStringProperty();
private final StringProperty sentBytes = new SimpleStringProperty();
private final StringProperty receivedBytes = new SimpleStringProperty();
private final StringProperty peerType = new SimpleStringProperty();
private final StringProperty connectionType = new SimpleStringProperty();
private final StringProperty onionAddress = new SimpleStringProperty();
private final Clock.Listener listener;
public NetworkStatisticListItem(Connection connection, Clock clock, BSFormatter formatter) {
public P2pNetworkListItem(Connection connection, Clock clock, BSFormatter formatter) {
this.connection = connection;
this.clock = clock;
this.formatter = formatter;
@ -59,6 +62,9 @@ public class NetworkStatisticListItem {
@Override
public void onSecondTick() {
onLastActivityChanged(statistic.getLastActivityTimestamp());
updatePeerType();
updateConnectionType();
updateOnionAddress();
}
@Override
@ -71,6 +77,9 @@ public class NetworkStatisticListItem {
};
clock.addListener(listener);
onLastActivityChanged(statistic.getLastActivityTimestamp());
updatePeerType();
updateConnectionType();
updateOnionAddress();
}
private void onLastActivityChanged(long timeStamp) {
@ -83,28 +92,50 @@ public class NetworkStatisticListItem {
clock.removeListener(listener);
}
public String getOnionAddress() {
if (connection.getPeersNodeAddressOptional().isPresent())
return connection.getPeersNodeAddressOptional().get().getFullAddress();
else
return "";
public void updateOnionAddress() {
onionAddress.set(connection.getPeersNodeAddressOptional().isPresent() ?
connection.getPeersNodeAddressOptional().get().getFullAddress() : "Not known yet");
}
public String getConnectionType() {
return connection instanceof OutboundConnection ? "outbound" : "inbound";
public void updateConnectionType() {
connectionType.set(connection instanceof OutboundConnection ? "outbound" : "inbound");
}
public void updatePeerType() {
if (connection.getPeerType() == Connection.PeerType.SEED_NODE)
peerType.set("Seed node");
else if (connection.getPeerType() == Connection.PeerType.DIRECT_MSG_PEER)
peerType.set("Peer (direct)");
else
peerType.set("Peer");
}
public String getCreationDate() {
return formatter.formatDateTime(statistic.getCreationDate());
}
public String getOnionAddress() {
return onionAddress.get();
}
public StringProperty getOnionAddressProperty() {
return onionAddress;
}
public String getConnectionType() {
return connectionType.get();
}
public StringProperty getConnectionTypeProperty() {
return connectionType;
}
public String getPeerType() {
if (connection.getPeerType() == Connection.PeerType.SEED_NODE)
return "Seed node";
else if (connection.getPeerType() == Connection.PeerType.DIRECT_MSG_PEER)
return "Peer (direct)";
else
return "Peer";
return peerType.get();
}
public StringProperty getPeerTypeProperty() {
return peerType;
}
public String getLastActivity() {

View File

@ -141,8 +141,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (newValue)
onNetworkReady();
});
numConnectedPeers.set(networkNode.getAllConnections().size());
}
@ -304,14 +302,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onConnection(Connection connection) {
numConnectedPeers.set(networkNode.getAllConnections().size());
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1);
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3);
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
numConnectedPeers.set(networkNode.getAllConnections().size());
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1);
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3);
}
@Override
@ -717,10 +715,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return networkNode.getNodeAddress();
}
public Set<NodeAddress> getNodeAddressesOfConnectedPeers() {
return networkNode.getNodeAddressesOfConfirmedConnections();
}
public ReadOnlyIntegerProperty getNumConnectedPeers() {
return numConnectedPeers;
}

View File

@ -19,6 +19,7 @@ public enum CloseConnectionReason {
// maintenance
TOO_MANY_CONNECTIONS_OPEN(true, true),
TOO_MANY_SEED_NODES_CONNECTED(true, true),
UNKNOWN_PEER_ADDRESS(true, true),
// illegal requests
RULE_VIOLATION(true, true);

View File

@ -372,11 +372,12 @@ public class Connection implements MessageListener {
setStopFlags();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
t.printStackTrace();
} finally {
setStopFlags();
UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
}
}).start();
@ -384,6 +385,10 @@ public class Connection implements MessageListener {
setStopFlags();
doShutDown(closeConnectionReason, shutDownCompleteHandler);
}
} else {
//TODO find out why we get called that
log.warn("stopped was already true at shutDown call");
UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
}
}
@ -607,14 +612,25 @@ public class Connection implements MessageListener {
Object rawInputObject = objectInputStream.readObject();
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
sharedModel.connection,
StringUtils.abbreviate(rawInputObject.toString(), 100),
size);
boolean doPrintLogs = true;
if (rawInputObject instanceof Message) {
Message message = (Message) rawInputObject;
Connection connection = sharedModel.connection;
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
// We dont want to get all KeepAliveMessage logged
doPrintLogs = !(message instanceof KeepAliveMessage);
}
if (doPrintLogs) {
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
sharedModel.connection,
StringUtils.abbreviate(rawInputObject.toString(), 100),
size);
}
if (size > getMaxMsgSize()) {
reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
@ -659,26 +675,25 @@ public class Connection implements MessageListener {
}
Message message = (Message) serializable;
Connection connection = sharedModel.connection;
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
if (message.getMessageVersion() != Version.getP2PMessageVersion()) {
reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
return;
}
Connection connection = sharedModel.connection;
if (message instanceof CloseConnectionMessage) {
log.info("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", ((CloseConnectionMessage) message).reason, connection);
stop();
sharedModel.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
} else if (!stopped) {
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
// We don't want to get the activity ts updated by ping/pong msg
if (!(message instanceof KeepAliveMessage))
connection.statistic.updateLastActivityTimestamp();
// First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service
// published, so does not know its address. As the IncomingConnection does not has the

View File

@ -29,7 +29,6 @@ public class LocalhostNetworkNode extends NetworkNode {
private static volatile int simulateTorDelayTorNode = 100;
private static volatile int simulateTorDelayHiddenService = 500;
private NodeAddress nodeAddress;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
LocalhostNetworkNode.simulateTorDelayTorNode = simulateTorDelayTorNode;
@ -70,20 +69,12 @@ public class LocalhostNetworkNode extends NetworkNode {
log.error("Exception at startServer: " + e.getMessage());
}
nodeAddress = new NodeAddress("localhost", servicePort);
nodeAddressProperty.set(new NodeAddress("localhost", servicePort));
setupListeners.stream().forEach(SetupListener::onHiddenServicePublished);
});
});
}
@Override
@Nullable
public NodeAddress getNodeAddress() {
return nodeAddress;
}
// Called from NetworkNode thread
@Override
protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException {

View File

@ -54,6 +54,8 @@ class Server implements Runnable {
if (!stopped)
connections.add(connection);
else
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
}
}
} catch (IOException e) {
@ -83,6 +85,8 @@ class Server implements Runnable {
} finally {
log.info("Server shutdown complete");
}
} else {
log.warn("stopped already called ast shutdown");
}
}
}

View File

@ -47,7 +47,6 @@ public class TorNetworkNode extends NetworkNode {
private int restartCounter;
private MonadicBinding<Boolean> allShutDown;
// /////////////////////////////////////////////////////////////////////////////////////////
// Constructor
// /////////////////////////////////////////////////////////////////////////////////////////
@ -83,22 +82,13 @@ public class TorNetworkNode extends NetworkNode {
hiddenServiceDescriptor -> {
Log.traceCall("hiddenService created");
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
nodeAddressProperty.set(new NodeAddress(hiddenServiceDescriptor.getFullAddress()));
startServer(hiddenServiceDescriptor.getServerSocket());
setupListeners.stream().forEach(SetupListener::onHiddenServicePublished);
});
});
}
@Override
@Nullable
public NodeAddress getNodeAddress() {
if (hiddenServiceDescriptor != null)
return new NodeAddress(hiddenServiceDescriptor.getFullAddress());
else
return null;
}
@Override
protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException {
checkArgument(peerNodeAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address");

View File

@ -7,9 +7,7 @@ import io.bitsquare.app.Log;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.storage.messages.BroadcastMessage;
import org.apache.commons.lang3.StringUtils;
@ -22,9 +20,10 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class BroadcastHandler implements ConnectionListener, PeerManager.Listener {
public class BroadcastHandler implements PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class);
private static final long TIMEOUT_SEC = 60;
private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 2 : 20;
private static final long DELAY_MS = Timer.STRESS_TEST ? 10 : 300;
interface ResultHandler {
void onCompleted(BroadcastHandler broadcastHandler);
@ -63,8 +62,7 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene
public BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) {
this.networkNode = networkNode;
this.peerManager = peerManager;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
peerManager.addListener(this);
uid = UUID.randomUUID().toString();
}
@ -85,22 +83,21 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene
Log.traceCall("Sender=" + sender + "\n\t" +
"Message=" + StringUtils.abbreviate(message.toString(), 100));
timeoutTimer = UserThread.runAfter(() ->
onFault("Timeout: Broadcast did not complete after " + TIMEOUT_SEC + " sec."), TIMEOUT_SEC);
Set<Connection> receivers = networkNode.getConfirmedConnections();
if (!receivers.isEmpty()) {
timeoutTimer = UserThread.runAfter(() ->
onFault("Timeout: Broadcast did not complete after " + TIMEOUT_PER_PEER_SEC + " sec."), TIMEOUT_PER_PEER_SEC * receivers.size());
numOfPeers = receivers.size();
numOfCompletedBroadcasts = 0;
log.info("Broadcast message to {} peers.", numOfPeers);
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> UserThread.runAfterRandomDelay(() ->
sendToPeer(connection, message), 1, 500, TimeUnit.MILLISECONDS));
sendToPeer(connection, message), 1, DELAY_MS, TimeUnit.MILLISECONDS));
} else {
String errorMessage = "Message not broadcasted because we have no available peers yet.\n\t" +
onFault("Message not broadcasted because we have no available peers yet.\n\t" +
"That should never happen as broadcast should not be called in such cases.\n" +
"message = " + StringUtils.abbreviate(message.toString(), 100);
onFault(errorMessage);
"message = " + StringUtils.abbreviate(message.toString(), 100));
}
}
@ -132,8 +129,7 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene
resultHandler.onCompleted(BroadcastHandler.this);
}
} else {
log.warn("stopped at onSuccess: " + errorMessage);
onFault(errorMessage);
onFault("stopped at onSuccess: " + errorMessage);
}
}
@ -143,57 +139,63 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene
if (!stopped) {
log.info("Broadcast to " + nodeAddress + " failed.\n\t" +
"ErrorMessage=" + throwable.getMessage());
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers)
onFault("stopped at onFailure: " + errorMessage);
} else {
log.warn("stopped at onFailure: " + errorMessage);
onFault(errorMessage);
onFault("stopped at onFailure: " + errorMessage);
}
}
});
} else {
log.warn("stopped at sendToPeer: " + errorMessage);
onFault(errorMessage);
onFault("stopped at sendToPeer: " + errorMessage);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
stopped = false;
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
stopped = true;
onFault("All connections lost");
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
stopped = false;
}
@Override
public void onAwakeFromStandby() {
if (!networkNode.getAllConnections().isEmpty())
stopped = false;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void cleanup() {
stopped = true;
peerManager.removeListener(this);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
private void onFault(String errorMessage) {
log.warn(errorMessage);
if (listener != null)
listener.onBroadcastFailed(errorMessage);
if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped))
listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts);
cleanup();
resultHandler.onFault(this);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -208,26 +210,4 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene
public int hashCode() {
return uid != null ? uid.hashCode() : 0;
}
private void onFault(String errorMessage) {
log.warn(errorMessage);
if (listener != null)
listener.onBroadcastFailed(errorMessage);
if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped))
listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts);
cleanup();
resultHandler.onFault(this);
}
private void cleanup() {
stopped = true;
networkNode.removeConnectionListener(this);
peerManager.removeListener(this);
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -8,7 +8,6 @@ import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.peerexchange.ReportedPeer;
import io.bitsquare.storage.Storage;
import javafx.beans.value.ChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,14 +18,14 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class PeerManager implements ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private static final Logger log = LoggerFactory.getLogger(PeerManager.class);
private static final long CHECK_MAX_CONN_DELAY_SEC = Timer.STRESS_TEST ? 1 : 5;
private static final long REMOVE_ANONYMOUS_PEER_SEC = Timer.STRESS_TEST ? 1 : 30;
private static final long CHECK_MAX_CONN_DELAY_SEC = 3;
private static int MAX_CONNECTIONS;
private static int MIN_CONNECTIONS;
private static int MAX_CONNECTIONS_PEER;
@ -46,7 +45,7 @@ public class PeerManager implements ConnectionListener {
}
static {
setMaxConnections(12);
setMaxConnections(6);
}
private static final int MAX_REPORTED_PEERS = 1000;
@ -80,7 +79,6 @@ public class PeerManager implements ConnectionListener {
private final HashSet<ReportedPeer> persistedPeers = new HashSet<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private Timer checkMaxConnectionsTimer;
private final ChangeListener<NodeAddress> connectionNodeAddressListener;
private final Clock.Listener listener;
private final List<Listener> listeners = new LinkedList<>();
private boolean stopped;
@ -102,21 +100,6 @@ public class PeerManager implements ConnectionListener {
this.persistedPeers.addAll(persistedPeers);
}
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
// Every time we get a new peer connected with a known address we check if we need to remove peers
printConnectedPeers();
if (checkMaxConnectionsTimer == null && newValue != null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> {
if (!stopped) {
removeTooOldReportedPeers();
removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS);
} else {
log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call.");
}
}, CHECK_MAX_CONN_DELAY_SEC);
};
// we check if app was idle for more then 5 sec.
listener = new Clock.Listener() {
@Override
@ -171,11 +154,11 @@ public class PeerManager implements ConnectionListener {
@Override
public void onConnection(Connection connection) {
connection.peersNodeAddressProperty().addListener(connectionNodeAddressListener);
if (isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
doHouseKeeping();
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
@ -185,7 +168,6 @@ public class PeerManager implements ConnectionListener {
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
connection.peersNodeAddressProperty().removeListener(connectionNodeAddressListener);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
@ -201,13 +183,32 @@ public class PeerManager implements ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Check max connections
// Housekeeping
///////////////////////////////////////////////////////////////////////////////////////////
private void doHouseKeeping() {
log.trace("Peers before doHouseKeeping");
printConnectedPeers();
if (checkMaxConnectionsTimer == null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> {
stopCheckMaxConnectionsTimer();
if (!stopped) {
removeAnonymousPeers();
removeSuperfluousSeedNodes();
removeTooOldReportedPeers();
removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS);
} else {
log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call.");
}
}, CHECK_MAX_CONN_DELAY_SEC);
log.trace("Peers after doHouseKeeping");
printConnectedPeers();
}
private boolean checkMaxConnections(int limit) {
Log.traceCall("limit=" + limit);
stopCheckMaxConnectionsTimer();
removeSuperfluousSeedNodes();
Set<Connection> allConnections = networkNode.getAllConnections();
int size = allConnections.size();
log.info("We have {} connections open. Our limit is {}", size, limit);
@ -269,20 +270,36 @@ public class PeerManager implements ConnectionListener {
}
}
private void removeAnonymousPeers() {
Log.traceCall();
networkNode.getAllConnections().stream()
.filter(connection -> !connection.hasPeersNodeAddress())
.forEach(connection -> UserThread.runAfter(() -> {
// We give 30 seconds delay and check again if still no address is set
if (!connection.hasPeersNodeAddress()) {
log.info("We close the connection as the peer address is still unknown.\n\t" +
"connection=" + connection);
connection.shutDown(CloseConnectionReason.UNKNOWN_PEER_ADDRESS);
}
}, REMOVE_ANONYMOUS_PEER_SEC));
}
private void removeSuperfluousSeedNodes() {
Log.traceCall();
Set<Connection> connections = networkNode.getConfirmedConnections();
if (hasSufficientConnections()) {
List<Connection> candidates = connections.stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
if (networkNode.getConfirmedConnections().size() > MAX_CONNECTIONS) {
Set<Connection> connections = networkNode.getConfirmedConnections();
if (hasSufficientConnections()) {
List<Connection> candidates = connections.stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
if (candidates.size() > 1) {
candidates.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp())));
log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size());
Connection connection = candidates.remove(0);
log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString());
connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED, this::removeSuperfluousSeedNodes);
if (candidates.size() > 1) {
candidates.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp())));
log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size());
Connection connection = candidates.remove(0);
log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString());
connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED, this::removeSuperfluousSeedNodes);
}
}
}
}

View File

@ -25,8 +25,7 @@ import static com.google.common.base.Preconditions.checkArgument;
public class GetDataRequestHandler {
private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class);
private static final long TIME_OUT_SEC = 20;
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -30,6 +30,9 @@ import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataHandler.class);
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -129,7 +132,7 @@ public class RequestDataHandler implements MessageListener {
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
10);
TIME_OUT_SEC);
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}

View File

@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
private static final long RETRY_DELAY_SEC = 10;
private static final long RETRY_DELAY_SEC = Timer.STRESS_TEST ? 3 : 10;
///////////////////////////////////////////////////////////////////////////////////////////
@ -200,7 +200,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -289,7 +289,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
retryTimer = UserThread.runAfter(() -> {
log.trace("retryTimer called");
stopped = false;
stopRetryTimer();
// We create a new list of candidates

View File

@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit;
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
private Timer delayTimer;
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 5000;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
@ -48,7 +49,8 @@ class KeepAliveHandler implements MessageListener {
@Nullable
private Connection connection;
private boolean stopped;
private Timer delayTimer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -70,7 +72,7 @@ class KeepAliveHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void sendPingAfterRandomDelay(Connection connection) {
delayTimer = UserThread.runAfterRandomDelay(() -> sendPing(connection), 1, 5000, TimeUnit.MILLISECONDS);
delayTimer = UserThread.runAfterRandomDelay(() -> sendPing(connection), 1, DELAY_MS, TimeUnit.MILLISECONDS);
}
private void sendPing(Connection connection) {

View File

@ -22,8 +22,8 @@ import java.util.Random;
public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
private static final int INTERVAL_SEC = new Random().nextInt(5) + 20;
private static final long LAST_ACTIVITY_AGE_MILLIS = INTERVAL_SEC / 2;
private static final int INTERVAL_SEC = Timer.STRESS_TEST ? 2 : new Random().nextInt(5) + 20;
private static final long LAST_ACTIVITY_AGE_MS = INTERVAL_SEC / 2;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -170,7 +170,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
Log.traceCall();
networkNode.getConfirmedConnections().stream()
.filter(connection -> connection instanceof OutboundConnection &&
connection.getStatistic().getLastActivityAge() > LAST_ACTIVITY_AGE_MILLIS)
connection.getStatistic().getLastActivityAge() > LAST_ACTIVITY_AGE_MS)
.forEach(connection -> {
final String uid = connection.getUid();
if (!handlerMap.containsKey(uid)) {

View File

@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
class GetPeersRequestHandler {
private static final Logger log = LoggerFactory.getLogger(GetPeersRequestHandler.class);
private static final long TIME_OUT_SEC = 20;
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -28,7 +28,8 @@ import static com.google.common.base.Preconditions.checkArgument;
class PeerExchangeHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class);
private static final long TIME_OUT_SEC = 20;
private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20;
private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 3000;
///////////////////////////////////////////////////////////////////////////////////////////
@ -77,7 +78,7 @@ class PeerExchangeHandler implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void sendGetPeersRequestAfterRandomDelay(NodeAddress nodeAddress) {
delayTimer = UserThread.runAfterRandomDelay(() -> sendGetPeersRequest(nodeAddress), 1, 3000, TimeUnit.MILLISECONDS);
delayTimer = UserThread.runAfterRandomDelay(() -> sendGetPeersRequest(nodeAddress), 1, DELAY_MS, TimeUnit.MILLISECONDS);
}
private void sendGetPeersRequest(NodeAddress nodeAddress) {

View File

@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class PeerExchangeManager implements MessageListener, ConnectionListener, PeerManager.Listener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
private static final long RETRY_DELAY_SEC = 10;
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 3;
private static final long REQUEST_PERIODICALLY_INTERVAL_MINUTES = 10;
private static final long RETRY_DELAY_SEC = Timer.STRESS_TEST ? 2 : 10;
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = Timer.STRESS_TEST ? 1 : 3;
private static final long REQUEST_PERIODICALLY_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 10 * 60;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -294,7 +294,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
stopped = false;
if (periodicTimer == null)
periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers,
REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES);
REQUEST_PERIODICALLY_INTERVAL_SEC, TimeUnit.SECONDS);
else
log.warn("periodicTimer already started");
}

View File

@ -42,7 +42,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
@VisibleForTesting
public static int CHECK_TTL_INTERVAL_SEC = 30;
public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 30;
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedStorageEntry> map = new ConcurrentHashMap<>();