diff --git a/common/src/main/java/io/bitsquare/common/DefaultJavaTimer.java b/common/src/main/java/io/bitsquare/common/DefaultJavaTimer.java deleted file mode 100644 index 55f8ab7d24..0000000000 --- a/common/src/main/java/io/bitsquare/common/DefaultJavaTimer.java +++ /dev/null @@ -1,69 +0,0 @@ -package io.bitsquare.common; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.Random; -import java.util.TimerTask; - -public class DefaultJavaTimer implements Timer { - private final Logger log = LoggerFactory.getLogger(DefaultJavaTimer.class); - private java.util.Timer timer; - - public DefaultJavaTimer() { - - } - - @Override - public Timer runLater(Duration delay, Runnable runnable) { - if (timer == null) { - timer = new java.util.Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000)); - try { - UserThread.execute(runnable::run); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing timerTask failed. " + t.getMessage()); - } - } - }, delay.toMillis()); - } else { - log.warn("runLater called on an already running timer."); - } - return this; - } - - @Override - public Timer runPeriodically(java.time.Duration interval, Runnable runnable) { - if (timer == null) { - timer = new java.util.Timer(); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000)); - try { - UserThread.execute(runnable::run); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing timerTask failed. " + t.getMessage()); - } - } - }, interval.toMillis(), interval.toMillis()); - } else { - log.warn("runLater called on an already running timer."); - } - return this; - } - - @Override - public void stop() { - if (timer != null) { - timer.cancel(); - timer = null; - } - } -} diff --git a/common/src/main/java/io/bitsquare/common/Timer.java b/common/src/main/java/io/bitsquare/common/Timer.java index cab1c499e3..119fa3d526 100644 --- a/common/src/main/java/io/bitsquare/common/Timer.java +++ b/common/src/main/java/io/bitsquare/common/Timer.java @@ -3,6 +3,8 @@ package io.bitsquare.common; import java.time.Duration; public interface Timer { + boolean STRESS_TEST = false; + Timer runLater(java.time.Duration delay, Runnable action); Timer runPeriodically(Duration interval, Runnable runnable); diff --git a/common/src/main/java/io/bitsquare/common/UserThread.java b/common/src/main/java/io/bitsquare/common/UserThread.java index c10a17c75d..545e0ef347 100644 --- a/common/src/main/java/io/bitsquare/common/UserThread.java +++ b/common/src/main/java/io/bitsquare/common/UserThread.java @@ -46,7 +46,7 @@ public class UserThread { static { // If not defined we use same thread as caller thread executor = MoreExecutors.directExecutor(); - timerClass = DefaultJavaTimer.class; + timerClass = FrameRateTimer.class; } private static Executor executor; diff --git a/common/src/main/java/io/bitsquare/common/util/Utilities.java b/common/src/main/java/io/bitsquare/common/util/Utilities.java index 7c1d23229e..e89f1ae7e7 100644 --- a/common/src/main/java/io/bitsquare/common/util/Utilities.java +++ b/common/src/main/java/io/bitsquare/common/util/Utilities.java @@ -60,8 +60,8 @@ public class Utilities { public static ListeningExecutorService getListeningExecutorService(String name, int corePoolSize, int maximumPoolSize, - long keepAliveTime) { - return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTime)); + long keepAliveTimeInSec) { + return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec)); } public static ThreadPoolExecutor getThreadPoolExecutor(String name, diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 0452378a97..0fad9acbe6 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -60,10 +60,10 @@ import static io.bitsquare.util.Validator.nonEmptyStringOf; public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener { private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class); - private static final long RETRY_REPUBLISH_DELAY_SEC = 5; - private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10; - private static final long REPUBLISH_INTERVAL_MILLIS = 10 * Offer.TTL; - private static final long REFRESH_INTERVAL_MILLIS = (long) (Offer.TTL * 0.5); + private static final long RETRY_REPUBLISH_DELAY_SEC = Timer.STRESS_TEST ? 1 : 5; + private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = Timer.STRESS_TEST ? 1 : 10; + private static final long REPUBLISH_INTERVAL_MS = Timer.STRESS_TEST ? 3000 : 10 * Offer.TTL; + private static final long REFRESH_INTERVAL_MS = Timer.STRESS_TEST ? 1000 : (long) (Offer.TTL * 0.5); private final KeyRing keyRing; private final User user; @@ -404,7 +404,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe log.warn("We have stopped already. We ignore that periodicRepublishOffersTimer.run call."); } }, - REPUBLISH_INTERVAL_MILLIS, + REPUBLISH_INTERVAL_MS, TimeUnit.MILLISECONDS); else log.trace("periodicRepublishOffersTimer already stated"); @@ -425,7 +425,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call."); } }, - REFRESH_INTERVAL_MILLIS, + REFRESH_INTERVAL_MS, TimeUnit.MILLISECONDS); else log.trace("periodicRefreshOffersTimer already stated"); diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkStatisticListItem.java b/gui/src/main/java/io/bitsquare/gui/main/settings/network/P2pNetworkListItem.java similarity index 72% rename from gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkStatisticListItem.java rename to gui/src/main/java/io/bitsquare/gui/main/settings/network/P2pNetworkListItem.java index fca02611cd..c8a82ea93b 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkStatisticListItem.java +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/P2pNetworkListItem.java @@ -30,8 +30,8 @@ import org.fxmisc.easybind.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NetworkStatisticListItem { - private static final Logger log = LoggerFactory.getLogger(NetworkStatisticListItem.class); +public class P2pNetworkListItem { + private static final Logger log = LoggerFactory.getLogger(P2pNetworkListItem.class); private final Statistic statistic; private final Connection connection; @@ -42,9 +42,12 @@ public class NetworkStatisticListItem { private final StringProperty lastActivity = new SimpleStringProperty(); private final StringProperty sentBytes = new SimpleStringProperty(); private final StringProperty receivedBytes = new SimpleStringProperty(); + private final StringProperty peerType = new SimpleStringProperty(); + private final StringProperty connectionType = new SimpleStringProperty(); + private final StringProperty onionAddress = new SimpleStringProperty(); private final Clock.Listener listener; - public NetworkStatisticListItem(Connection connection, Clock clock, BSFormatter formatter) { + public P2pNetworkListItem(Connection connection, Clock clock, BSFormatter formatter) { this.connection = connection; this.clock = clock; this.formatter = formatter; @@ -59,6 +62,9 @@ public class NetworkStatisticListItem { @Override public void onSecondTick() { onLastActivityChanged(statistic.getLastActivityTimestamp()); + updatePeerType(); + updateConnectionType(); + updateOnionAddress(); } @Override @@ -71,6 +77,9 @@ public class NetworkStatisticListItem { }; clock.addListener(listener); onLastActivityChanged(statistic.getLastActivityTimestamp()); + updatePeerType(); + updateConnectionType(); + updateOnionAddress(); } private void onLastActivityChanged(long timeStamp) { @@ -83,28 +92,50 @@ public class NetworkStatisticListItem { clock.removeListener(listener); } - public String getOnionAddress() { - if (connection.getPeersNodeAddressOptional().isPresent()) - return connection.getPeersNodeAddressOptional().get().getFullAddress(); - else - return ""; + public void updateOnionAddress() { + onionAddress.set(connection.getPeersNodeAddressOptional().isPresent() ? + connection.getPeersNodeAddressOptional().get().getFullAddress() : "Not known yet"); } - public String getConnectionType() { - return connection instanceof OutboundConnection ? "outbound" : "inbound"; + public void updateConnectionType() { + connectionType.set(connection instanceof OutboundConnection ? "outbound" : "inbound"); + } + + public void updatePeerType() { + if (connection.getPeerType() == Connection.PeerType.SEED_NODE) + peerType.set("Seed node"); + else if (connection.getPeerType() == Connection.PeerType.DIRECT_MSG_PEER) + peerType.set("Peer (direct)"); + else + peerType.set("Peer"); } public String getCreationDate() { return formatter.formatDateTime(statistic.getCreationDate()); } + public String getOnionAddress() { + return onionAddress.get(); + } + + public StringProperty getOnionAddressProperty() { + return onionAddress; + } + + public String getConnectionType() { + return connectionType.get(); + } + + public StringProperty getConnectionTypeProperty() { + return connectionType; + } + public String getPeerType() { - if (connection.getPeerType() == Connection.PeerType.SEED_NODE) - return "Seed node"; - else if (connection.getPeerType() == Connection.PeerType.DIRECT_MSG_PEER) - return "Peer (direct)"; - else - return "Peer"; + return peerType.get(); + } + + public StringProperty getPeerTypeProperty() { + return peerType; } public String getLastActivity() { diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index aacc201070..a9c6f164e6 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -141,8 +141,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis if (newValue) onNetworkReady(); }); - - numConnectedPeers.set(networkNode.getAllConnections().size()); } @@ -304,14 +302,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onConnection(Connection connection) { numConnectedPeers.set(networkNode.getAllConnections().size()); - UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1); + UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { Log.traceCall(); numConnectedPeers.set(networkNode.getAllConnections().size()); - UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 1); + UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getAllConnections().size()), 3); } @Override @@ -717,10 +715,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis return networkNode.getNodeAddress(); } - public Set getNodeAddressesOfConnectedPeers() { - return networkNode.getNodeAddressesOfConfirmedConnections(); - } - public ReadOnlyIntegerProperty getNumConnectedPeers() { return numConnectedPeers; } diff --git a/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java b/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java index f00c18660a..ee9797eedb 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java +++ b/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java @@ -19,6 +19,7 @@ public enum CloseConnectionReason { // maintenance TOO_MANY_CONNECTIONS_OPEN(true, true), TOO_MANY_SEED_NODES_CONNECTED(true, true), + UNKNOWN_PEER_ADDRESS(true, true), // illegal requests RULE_VIOLATION(true, true); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index fc106ea4cd..7f23611ca4 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -372,11 +372,12 @@ public class Connection implements MessageListener { setStopFlags(); - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } catch (Throwable t) { log.error(t.getMessage()); t.printStackTrace(); } finally { + setStopFlags(); UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler)); } }).start(); @@ -384,6 +385,10 @@ public class Connection implements MessageListener { setStopFlags(); doShutDown(closeConnectionReason, shutDownCompleteHandler); } + } else { + //TODO find out why we get called that + log.warn("stopped was already true at shutDown call"); + UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler)); } } @@ -607,14 +612,25 @@ public class Connection implements MessageListener { Object rawInputObject = objectInputStream.readObject(); int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; - log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + - "New data arrived at inputHandler of connection {}.\n" + - "Received object (truncated)={} / size={}" - + "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", - sharedModel.connection, - StringUtils.abbreviate(rawInputObject.toString(), 100), - size); + boolean doPrintLogs = true; + if (rawInputObject instanceof Message) { + Message message = (Message) rawInputObject; + Connection connection = sharedModel.connection; + connection.statistic.addReceivedBytes(size); + connection.statistic.addReceivedMessage(message); + // We dont want to get all KeepAliveMessage logged + doPrintLogs = !(message instanceof KeepAliveMessage); + } + if (doPrintLogs) { + log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + + "New data arrived at inputHandler of connection {}.\n" + + "Received object (truncated)={} / size={}" + + "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", + sharedModel.connection, + StringUtils.abbreviate(rawInputObject.toString(), 100), + size); + } if (size > getMaxMsgSize()) { reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED); @@ -659,26 +675,25 @@ public class Connection implements MessageListener { } Message message = (Message) serializable; + Connection connection = sharedModel.connection; + connection.statistic.addReceivedBytes(size); + connection.statistic.addReceivedMessage(message); + if (message.getMessageVersion() != Version.getP2PMessageVersion()) { reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID); return; } - Connection connection = sharedModel.connection; if (message instanceof CloseConnectionMessage) { log.info("CloseConnectionMessage received. Reason={}\n\t" + "connection={}", ((CloseConnectionMessage) message).reason, connection); stop(); sharedModel.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER); } else if (!stopped) { - connection.statistic.addReceivedBytes(size); - connection.statistic.addReceivedMessage(message); - // We don't want to get the activity ts updated by ping/pong msg if (!(message instanceof KeepAliveMessage)) connection.statistic.updateLastActivityTimestamp(); - // First a seed node gets a message form a peer (PreliminaryDataRequest using // AnonymousMessage interface) which does not has its hidden service // published, so does not know its address. As the IncomingConnection does not has the diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index 2eec13fc58..c94e40e301 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -29,7 +29,6 @@ public class LocalhostNetworkNode extends NetworkNode { private static volatile int simulateTorDelayTorNode = 100; private static volatile int simulateTorDelayHiddenService = 500; - private NodeAddress nodeAddress; public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { LocalhostNetworkNode.simulateTorDelayTorNode = simulateTorDelayTorNode; @@ -70,20 +69,12 @@ public class LocalhostNetworkNode extends NetworkNode { log.error("Exception at startServer: " + e.getMessage()); } - nodeAddress = new NodeAddress("localhost", servicePort); - + nodeAddressProperty.set(new NodeAddress("localhost", servicePort)); setupListeners.stream().forEach(SetupListener::onHiddenServicePublished); }); }); } - - @Override - @Nullable - public NodeAddress getNodeAddress() { - return nodeAddress; - } - // Called from NetworkNode thread @Override protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException { diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 5570ed67c7..ca4246e582 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -54,6 +54,8 @@ class Server implements Runnable { if (!stopped) connections.add(connection); + else + connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN); } } } catch (IOException e) { @@ -83,6 +85,8 @@ class Server implements Runnable { } finally { log.info("Server shutdown complete"); } + } else { + log.warn("stopped already called ast shutdown"); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index 9c3b2ee3df..c516f0a479 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -47,7 +47,6 @@ public class TorNetworkNode extends NetworkNode { private int restartCounter; private MonadicBinding allShutDown; - // ///////////////////////////////////////////////////////////////////////////////////////// // Constructor // ///////////////////////////////////////////////////////////////////////////////////////// @@ -83,22 +82,13 @@ public class TorNetworkNode extends NetworkNode { hiddenServiceDescriptor -> { Log.traceCall("hiddenService created"); TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; - + nodeAddressProperty.set(new NodeAddress(hiddenServiceDescriptor.getFullAddress())); startServer(hiddenServiceDescriptor.getServerSocket()); setupListeners.stream().forEach(SetupListener::onHiddenServicePublished); }); }); } - @Override - @Nullable - public NodeAddress getNodeAddress() { - if (hiddenServiceDescriptor != null) - return new NodeAddress(hiddenServiceDescriptor.getFullAddress()); - else - return null; - } - @Override protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException { checkArgument(peerNodeAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address"); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java index 628db0d047..86a0255ae2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/BroadcastHandler.java @@ -7,9 +7,7 @@ import io.bitsquare.app.Log; import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.CloseConnectionReason; import io.bitsquare.p2p.network.Connection; -import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.storage.messages.BroadcastMessage; import org.apache.commons.lang3.StringUtils; @@ -22,9 +20,10 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -public class BroadcastHandler implements ConnectionListener, PeerManager.Listener { +public class BroadcastHandler implements PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(BroadcastHandler.class); - private static final long TIMEOUT_SEC = 60; + private static final long TIMEOUT_PER_PEER_SEC = Timer.STRESS_TEST ? 2 : 20; + private static final long DELAY_MS = Timer.STRESS_TEST ? 10 : 300; interface ResultHandler { void onCompleted(BroadcastHandler broadcastHandler); @@ -63,8 +62,7 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene public BroadcastHandler(NetworkNode networkNode, PeerManager peerManager) { this.networkNode = networkNode; this.peerManager = peerManager; - networkNode.removeConnectionListener(this); - peerManager.removeListener(this); + peerManager.addListener(this); uid = UUID.randomUUID().toString(); } @@ -85,22 +83,21 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene Log.traceCall("Sender=" + sender + "\n\t" + "Message=" + StringUtils.abbreviate(message.toString(), 100)); - timeoutTimer = UserThread.runAfter(() -> - onFault("Timeout: Broadcast did not complete after " + TIMEOUT_SEC + " sec."), TIMEOUT_SEC); Set receivers = networkNode.getConfirmedConnections(); if (!receivers.isEmpty()) { + timeoutTimer = UserThread.runAfter(() -> + onFault("Timeout: Broadcast did not complete after " + TIMEOUT_PER_PEER_SEC + " sec."), TIMEOUT_PER_PEER_SEC * receivers.size()); numOfPeers = receivers.size(); numOfCompletedBroadcasts = 0; log.info("Broadcast message to {} peers.", numOfPeers); receivers.stream() .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .forEach(connection -> UserThread.runAfterRandomDelay(() -> - sendToPeer(connection, message), 1, 500, TimeUnit.MILLISECONDS)); + sendToPeer(connection, message), 1, DELAY_MS, TimeUnit.MILLISECONDS)); } else { - String errorMessage = "Message not broadcasted because we have no available peers yet.\n\t" + + onFault("Message not broadcasted because we have no available peers yet.\n\t" + "That should never happen as broadcast should not be called in such cases.\n" + - "message = " + StringUtils.abbreviate(message.toString(), 100); - onFault(errorMessage); + "message = " + StringUtils.abbreviate(message.toString(), 100)); } } @@ -132,8 +129,7 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene resultHandler.onCompleted(BroadcastHandler.this); } } else { - log.warn("stopped at onSuccess: " + errorMessage); - onFault(errorMessage); + onFault("stopped at onSuccess: " + errorMessage); } } @@ -143,57 +139,63 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene if (!stopped) { log.info("Broadcast to " + nodeAddress + " failed.\n\t" + "ErrorMessage=" + throwable.getMessage()); + if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers) + onFault("stopped at onFailure: " + errorMessage); } else { - log.warn("stopped at onFailure: " + errorMessage); - onFault(errorMessage); + onFault("stopped at onFailure: " + errorMessage); } } }); } else { - log.warn("stopped at sendToPeer: " + errorMessage); - onFault(errorMessage); + onFault("stopped at sendToPeer: " + errorMessage); } } - /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onConnection(Connection connection) { - stopped = false; - } - - @Override - public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - } - - @Override - public void onError(Throwable throwable) { - } - - /////////////////////////////////////////////////////////////////////////////////////////// // PeerManager.Listener implementation /////////////////////////////////////////////////////////////////////////////////////////// @Override public void onAllConnectionsLost() { - stopped = true; + onFault("All connections lost"); } @Override public void onNewConnectionAfterAllConnectionsLost() { - stopped = false; } @Override public void onAwakeFromStandby() { - if (!networkNode.getAllConnections().isEmpty()) - stopped = false; } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void cleanup() { + stopped = true; + peerManager.removeListener(this); + if (timeoutTimer != null) { + timeoutTimer.stop(); + timeoutTimer = null; + } + } + + private void onFault(String errorMessage) { + log.warn(errorMessage); + if (listener != null) + listener.onBroadcastFailed(errorMessage); + + if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped)) + listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); + + cleanup(); + resultHandler.onFault(this); + } + + @Override public boolean equals(Object o) { if (this == o) return true; @@ -208,26 +210,4 @@ public class BroadcastHandler implements ConnectionListener, PeerManager.Listene public int hashCode() { return uid != null ? uid.hashCode() : 0; } - - private void onFault(String errorMessage) { - log.warn(errorMessage); - if (listener != null) - listener.onBroadcastFailed(errorMessage); - - if (listener != null && (numOfCompletedBroadcasts + numOfFailedBroadcasts == numOfPeers || stopped)) - listener.onBroadcastCompleted(message, numOfCompletedBroadcasts, numOfFailedBroadcasts); - - cleanup(); - resultHandler.onFault(this); - } - - private void cleanup() { - stopped = true; - networkNode.removeConnectionListener(this); - peerManager.removeListener(this); - if (timeoutTimer != null) { - timeoutTimer.stop(); - timeoutTimer = null; - } - } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 1bb810f70e..2a4309e347 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -8,7 +8,6 @@ import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.storage.Storage; -import javafx.beans.value.ChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,14 +18,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class PeerManager implements ConnectionListener { - private static final Logger log = LoggerFactory.getLogger(PeerManager.class); - /////////////////////////////////////////////////////////////////////////////////////////// // Static /////////////////////////////////////////////////////////////////////////////////////////// + private static final Logger log = LoggerFactory.getLogger(PeerManager.class); + + private static final long CHECK_MAX_CONN_DELAY_SEC = Timer.STRESS_TEST ? 1 : 5; + private static final long REMOVE_ANONYMOUS_PEER_SEC = Timer.STRESS_TEST ? 1 : 30; - private static final long CHECK_MAX_CONN_DELAY_SEC = 3; - private static int MAX_CONNECTIONS; private static int MIN_CONNECTIONS; private static int MAX_CONNECTIONS_PEER; @@ -46,7 +45,7 @@ public class PeerManager implements ConnectionListener { } static { - setMaxConnections(12); + setMaxConnections(6); } private static final int MAX_REPORTED_PEERS = 1000; @@ -80,7 +79,6 @@ public class PeerManager implements ConnectionListener { private final HashSet persistedPeers = new HashSet<>(); private final Set reportedPeers = new HashSet<>(); private Timer checkMaxConnectionsTimer; - private final ChangeListener connectionNodeAddressListener; private final Clock.Listener listener; private final List listeners = new LinkedList<>(); private boolean stopped; @@ -102,21 +100,6 @@ public class PeerManager implements ConnectionListener { this.persistedPeers.addAll(persistedPeers); } - connectionNodeAddressListener = (observable, oldValue, newValue) -> { - // Every time we get a new peer connected with a known address we check if we need to remove peers - printConnectedPeers(); - if (checkMaxConnectionsTimer == null && newValue != null) - checkMaxConnectionsTimer = UserThread.runAfter(() -> { - if (!stopped) { - removeTooOldReportedPeers(); - removeTooOldPersistedPeers(); - checkMaxConnections(MAX_CONNECTIONS); - } else { - log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call."); - } - }, CHECK_MAX_CONN_DELAY_SEC); - }; - // we check if app was idle for more then 5 sec. listener = new Clock.Listener() { @Override @@ -171,11 +154,11 @@ public class PeerManager implements ConnectionListener { @Override public void onConnection(Connection connection) { - connection.peersNodeAddressProperty().addListener(connectionNodeAddressListener); - if (isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); + doHouseKeeping(); + if (lostAllConnections) { lostAllConnections = false; stopped = false; @@ -185,7 +168,6 @@ public class PeerManager implements ConnectionListener { @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - connection.peersNodeAddressProperty().removeListener(connectionNodeAddressListener); handleConnectionFault(connection); lostAllConnections = networkNode.getAllConnections().isEmpty(); @@ -201,13 +183,32 @@ public class PeerManager implements ConnectionListener { /////////////////////////////////////////////////////////////////////////////////////////// - // Check max connections + // Housekeeping /////////////////////////////////////////////////////////////////////////////////////////// + private void doHouseKeeping() { + log.trace("Peers before doHouseKeeping"); + printConnectedPeers(); + if (checkMaxConnectionsTimer == null) + checkMaxConnectionsTimer = UserThread.runAfter(() -> { + stopCheckMaxConnectionsTimer(); + if (!stopped) { + removeAnonymousPeers(); + removeSuperfluousSeedNodes(); + removeTooOldReportedPeers(); + removeTooOldPersistedPeers(); + checkMaxConnections(MAX_CONNECTIONS); + } else { + log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call."); + } + }, CHECK_MAX_CONN_DELAY_SEC); + + log.trace("Peers after doHouseKeeping"); + printConnectedPeers(); + } + private boolean checkMaxConnections(int limit) { Log.traceCall("limit=" + limit); - stopCheckMaxConnectionsTimer(); - removeSuperfluousSeedNodes(); Set allConnections = networkNode.getAllConnections(); int size = allConnections.size(); log.info("We have {} connections open. Our limit is {}", size, limit); @@ -269,20 +270,36 @@ public class PeerManager implements ConnectionListener { } } + private void removeAnonymousPeers() { + Log.traceCall(); + networkNode.getAllConnections().stream() + .filter(connection -> !connection.hasPeersNodeAddress()) + .forEach(connection -> UserThread.runAfter(() -> { + // We give 30 seconds delay and check again if still no address is set + if (!connection.hasPeersNodeAddress()) { + log.info("We close the connection as the peer address is still unknown.\n\t" + + "connection=" + connection); + connection.shutDown(CloseConnectionReason.UNKNOWN_PEER_ADDRESS); + } + }, REMOVE_ANONYMOUS_PEER_SEC)); + } + private void removeSuperfluousSeedNodes() { Log.traceCall(); - Set connections = networkNode.getConfirmedConnections(); - if (hasSufficientConnections()) { - List candidates = connections.stream() - .filter(this::isSeedNode) - .collect(Collectors.toList()); + if (networkNode.getConfirmedConnections().size() > MAX_CONNECTIONS) { + Set connections = networkNode.getConfirmedConnections(); + if (hasSufficientConnections()) { + List candidates = connections.stream() + .filter(this::isSeedNode) + .collect(Collectors.toList()); - if (candidates.size() > 1) { - candidates.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp()))); - log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size()); - Connection connection = candidates.remove(0); - log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString()); - connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED, this::removeSuperfluousSeedNodes); + if (candidates.size() > 1) { + candidates.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp()))); + log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size()); + Connection connection = candidates.remove(0); + log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString()); + connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED, this::removeSuperfluousSeedNodes); + } } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java index 985b633c7d..221388c4fc 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java @@ -25,8 +25,7 @@ import static com.google.common.base.Preconditions.checkArgument; public class GetDataRequestHandler { private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class); - - private static final long TIME_OUT_SEC = 20; + private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20; /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index 0add8f0ac0..f7a45dd8d2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -30,6 +30,9 @@ import static com.google.common.base.Preconditions.checkArgument; public class RequestDataHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(RequestDataHandler.class); + private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20; + + /////////////////////////////////////////////////////////////////////////////////////////// // Listener /////////////////////////////////////////////////////////////////////////////////////////// @@ -129,7 +132,7 @@ public class RequestDataHandler implements MessageListener { "Might be caused by an previous networkNode.sendMessage.onFailure."); } }, - 10); + TIME_OUT_SEC); } else { log.warn("We have stopped already. We ignore that requestData call."); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java index aa76915b24..b646d44548 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; public class RequestDataManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); - private static final long RETRY_DELAY_SEC = 10; + private static final long RETRY_DELAY_SEC = Timer.STRESS_TEST ? 3 : 10; /////////////////////////////////////////////////////////////////////////////////////////// @@ -200,7 +200,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, } else { log.warn("We have stopped already. We ignore that onMessage call."); } - } + } } /////////////////////////////////////////////////////////////////////////////////////////// @@ -289,7 +289,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, retryTimer = UserThread.runAfter(() -> { log.trace("retryTimer called"); stopped = false; - + stopRetryTimer(); // We create a new list of candidates diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java index deaf9819d8..2e03cee0e2 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit; class KeepAliveHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class); - private Timer delayTimer; + private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 5000; + /////////////////////////////////////////////////////////////////////////////////////////// // Listener @@ -48,7 +49,8 @@ class KeepAliveHandler implements MessageListener { @Nullable private Connection connection; private boolean stopped; - + private Timer delayTimer; + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -70,7 +72,7 @@ class KeepAliveHandler implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// public void sendPingAfterRandomDelay(Connection connection) { - delayTimer = UserThread.runAfterRandomDelay(() -> sendPing(connection), 1, 5000, TimeUnit.MILLISECONDS); + delayTimer = UserThread.runAfterRandomDelay(() -> sendPing(connection), 1, DELAY_MS, TimeUnit.MILLISECONDS); } private void sendPing(Connection connection) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java index 1b96aaa6c7..7acee221d9 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java @@ -22,8 +22,8 @@ import java.util.Random; public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class); - private static final int INTERVAL_SEC = new Random().nextInt(5) + 20; - private static final long LAST_ACTIVITY_AGE_MILLIS = INTERVAL_SEC / 2; + private static final int INTERVAL_SEC = Timer.STRESS_TEST ? 2 : new Random().nextInt(5) + 20; + private static final long LAST_ACTIVITY_AGE_MS = INTERVAL_SEC / 2; private final NetworkNode networkNode; private final PeerManager peerManager; @@ -170,7 +170,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe Log.traceCall(); networkNode.getConfirmedConnections().stream() .filter(connection -> connection instanceof OutboundConnection && - connection.getStatistic().getLastActivityAge() > LAST_ACTIVITY_AGE_MILLIS) + connection.getStatistic().getLastActivityAge() > LAST_ACTIVITY_AGE_MS) .forEach(connection -> { final String uid = connection.getUid(); if (!handlerMap.containsKey(uid)) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java index 2873a29740..2d4453b161 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java @@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; class GetPeersRequestHandler { private static final Logger log = LoggerFactory.getLogger(GetPeersRequestHandler.class); - private static final long TIME_OUT_SEC = 20; + private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20; /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java index 9177d13a59..254d5ff13c 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -28,7 +28,8 @@ import static com.google.common.base.Preconditions.checkArgument; class PeerExchangeHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class); - private static final long TIME_OUT_SEC = 20; + private static final long TIME_OUT_SEC = Timer.STRESS_TEST ? 5 : 20; + private static int DELAY_MS = Timer.STRESS_TEST ? 1000 : 3000; /////////////////////////////////////////////////////////////////////////////////////////// @@ -77,7 +78,7 @@ class PeerExchangeHandler implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// public void sendGetPeersRequestAfterRandomDelay(NodeAddress nodeAddress) { - delayTimer = UserThread.runAfterRandomDelay(() -> sendGetPeersRequest(nodeAddress), 1, 3000, TimeUnit.MILLISECONDS); + delayTimer = UserThread.runAfterRandomDelay(() -> sendGetPeersRequest(nodeAddress), 1, DELAY_MS, TimeUnit.MILLISECONDS); } private void sendGetPeersRequest(NodeAddress nodeAddress) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java index eee24a35b3..c24aa38657 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java @@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull; public class PeerExchangeManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class); - private static final long RETRY_DELAY_SEC = 10; - private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 3; - private static final long REQUEST_PERIODICALLY_INTERVAL_MINUTES = 10; + private static final long RETRY_DELAY_SEC = Timer.STRESS_TEST ? 2 : 10; + private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = Timer.STRESS_TEST ? 1 : 3; + private static final long REQUEST_PERIODICALLY_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 10 * 60; private final NetworkNode networkNode; private final PeerManager peerManager; @@ -294,7 +294,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, stopped = false; if (periodicTimer == null) periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers, - REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES); + REQUEST_PERIODICALLY_INTERVAL_SEC, TimeUnit.SECONDS); else log.warn("periodicTimer already started"); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 6d1e3aed98..cb73398ed0 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -42,7 +42,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class); @VisibleForTesting - public static int CHECK_TTL_INTERVAL_SEC = 30; + public static int CHECK_TTL_INTERVAL_SEC = Timer.STRESS_TEST ? 5 : 30; private final Broadcaster broadcaster; private final Map map = new ConcurrentHashMap<>();