diff --git a/common/src/main/java/io/bitsquare/common/Clock.java b/common/src/main/java/io/bitsquare/common/Clock.java index b9e60fa430..7b878bd783 100644 --- a/common/src/main/java/io/bitsquare/common/Clock.java +++ b/common/src/main/java/io/bitsquare/common/Clock.java @@ -1,19 +1,65 @@ package io.bitsquare.common; -public interface Clock { - void start(); +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - void stop(); +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; - void addListener(Listener listener); +public class Clock { + private static final Logger log = LoggerFactory.getLogger(Clock.class); - void removeListener(Listener listener); + public static final int IDLE_TOLERANCE = 5000; - interface Listener { + public interface Listener { void onSecondTick(); void onMinuteTick(); void onMissedSecondTick(long missed); } + + private Timer timer; + private final List listeners = new LinkedList<>(); + private long counter = 0; + private long lastSecondTick; + + public Clock() { + } + + public void start() { + if (timer == null) { + lastSecondTick = System.currentTimeMillis(); + timer = UserThread.runPeriodically(() -> { + listeners.stream().forEach(Listener::onSecondTick); + counter++; + if (counter >= 60) { + counter = 0; + listeners.stream().forEach(Listener::onMinuteTick); + } + + long currentTimeMillis = System.currentTimeMillis(); + long diff = currentTimeMillis - lastSecondTick; + if (diff > 1000) + listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000)); + + lastSecondTick = currentTimeMillis; + }, 1, TimeUnit.SECONDS); + } + } + + public void stop() { + timer.stop(); + timer = null; + counter = 0; + } + + public void addListener(Listener listener) { + listeners.add(listener); + } + + public void removeListener(Listener listener) { + listeners.remove(listener); + } } diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 5408a9dc46..38e050b579 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -177,7 +177,7 @@ public class OpenOfferManager { @Override public void onMissedSecondTick(long missed) { - if (missed > 5000) { + if (missed > Clock.IDLE_TOLERANCE) { log.error("We have been idle for {} sec", missed / 1000); // We have been idle for at least 5 sec. diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareAppModule.java b/gui/src/main/java/io/bitsquare/app/BitsquareAppModule.java index 9718473034..36ee7e4559 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareAppModule.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareAppModule.java @@ -26,7 +26,6 @@ import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.KeyStorage; import io.bitsquare.crypto.EncryptionServiceModule; import io.bitsquare.gui.GuiModule; -import io.bitsquare.gui.common.UIClock; import io.bitsquare.gui.common.view.CachingViewLoader; import io.bitsquare.gui.main.intructions.InstructionCenter; import io.bitsquare.gui.main.notifications.NotificationCenter; @@ -64,7 +63,7 @@ class BitsquareAppModule extends AppModule { bind(Preferences.class).in(Singleton.class); bind(NotificationCenter.class).in(Singleton.class); bind(InstructionCenter.class).in(Singleton.class); - bind(Clock.class).to(UIClock.class).in(Singleton.class); + bind(Clock.class).in(Singleton.class); File storageDir = new File(env.getRequiredProperty(Storage.DIR_KEY)); bind(File.class).annotatedWith(named(Storage.DIR_KEY)).toInstance(storageDir); diff --git a/gui/src/main/java/io/bitsquare/gui/common/UIClock.java b/gui/src/main/java/io/bitsquare/gui/common/UIClock.java deleted file mode 100644 index ec98837208..0000000000 --- a/gui/src/main/java/io/bitsquare/gui/common/UIClock.java +++ /dev/null @@ -1,79 +0,0 @@ -package io.bitsquare.gui.common; - -import io.bitsquare.common.Clock; -import io.bitsquare.common.Timer; -import io.bitsquare.common.UserThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class UIClock implements Clock { - private static final Logger log = LoggerFactory.getLogger(UIClock.class); - private Timer timer; - - private final List listeners = new LinkedList<>(); - private long counter = 0; - private long lastSecondTick; - - public UIClock() { - } - - @Override - public void start() { - if (timer == null) { - lastSecondTick = System.currentTimeMillis(); - timer = UserThread.runPeriodically(() -> { - listeners.stream().forEach(Listener::onSecondTick); - counter++; - if (counter >= 60) { - counter = 0; - listeners.stream().forEach(Listener::onMinuteTick); - } - - long currentTimeMillis = System.currentTimeMillis(); - long diff = currentTimeMillis - lastSecondTick; - if (diff > 1000) - listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000)); - - lastSecondTick = currentTimeMillis; - }, 1, TimeUnit.SECONDS); - - /* timer = FxTimer.runPeriodically(Duration.ofSeconds(1), () -> { - listeners.stream().forEach(Listener::onSecondTick); - - counter++; - if (counter >= 60) { - counter = 0; - listeners.stream().forEach(Listener::onMinuteTick); - } - - long currentTimeMillis = System.currentTimeMillis(); - long diff = currentTimeMillis - lastSecondTick; - if (diff > 1000) - listeners.stream().forEach(listener -> listener.onMissedSecondTick(diff - 1000)); - - lastSecondTick = currentTimeMillis; - });*/ - } - } - - @Override - public void stop() { - timer.stop(); - timer = null; - counter = 0; - } - - @Override - public void addListener(Listener listener) { - listeners.add(listener); - } - - @Override - public void removeListener(Listener listener) { - listeners.remove(listener); - } -} diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 2604118e73..557c3058a0 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -8,6 +8,7 @@ import com.google.inject.Inject; import com.google.inject.name.Named; import io.bitsquare.app.Log; import io.bitsquare.app.ProgramArguments; +import io.bitsquare.common.Clock; import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; @@ -56,6 +57,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final SeedNodesRepository seedNodesRepository; private final int port; private final File torDir; + private Clock clock; private final Optional optionalEncryptionService; private final Optional optionalKeyRing; @@ -99,11 +101,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Named(ProgramArguments.USE_LOCALHOST) boolean useLocalhost, @Named(ProgramArguments.NETWORK_ID) int networkId, @Named("storage.dir") File storageDir, + Clock clock, @Nullable EncryptionService encryptionService, @Nullable KeyRing keyRing) { this.seedNodesRepository = seedNodesRepository; this.port = port; this.torDir = torDir; + this.clock = clock; optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService); optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing); @@ -126,7 +130,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis p2PDataStorage.addHashMapChangedListener(this); Set seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId); - peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir); + peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir, clock); requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this); @@ -247,11 +251,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Log.traceCall(); networkReadySubscription.unsubscribe(); - Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest(); + Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeAddressOfPreliminaryDataRequest(); checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); - requestDataManager.requestUpdatesData(); + requestDataManager.requestUpdateData(); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -267,7 +271,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onUpdatedDataReceived() { - Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest(); + Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeAddressOfPreliminaryDataRequest(); checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); peerExchangeManager.requestReportedPeersFromSeedNodes(seedNodeOfPreliminaryDataRequest.get()); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 137b07db1b..0f186738e1 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -1,6 +1,7 @@ package io.bitsquare.p2p.peers; import io.bitsquare.app.Log; +import io.bitsquare.common.Clock; import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Message; @@ -36,6 +37,7 @@ public class PeerManager implements ConnectionListener, MessageListener { private static int MAX_CONNECTIONS_ABSOLUTE; private final boolean printReportedPeersDetails = true; + private boolean lostAllConnections; public static void setMaxConnections(int maxConnections) { MAX_CONNECTIONS = maxConnections; @@ -54,7 +56,26 @@ public class PeerManager implements ConnectionListener, MessageListener { private static final long MAX_AGE = TimeUnit.DAYS.toMillis(14); // max age for reported peers is 14 days + /////////////////////////////////////////////////////////////////////////////////////////// + // Listener + /////////////////////////////////////////////////////////////////////////////////////////// + + + public interface Listener { + void onAllConnectionsLost(); + + void onNewConnectionAfterAllConnectionsLost(); + + void onAwakeFromStandby(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Instance fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final NetworkNode networkNode; + private Clock clock; private final Set seedNodeAddresses; private final Storage> dbStorage; @@ -62,14 +83,16 @@ public class PeerManager implements ConnectionListener, MessageListener { private final Set reportedPeers = new HashSet<>(); private Timer checkMaxConnectionsTimer; private final ChangeListener connectionNodeAddressListener; - + private final Clock.Listener listener; + private final List listeners = new LinkedList<>(); /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public PeerManager(NetworkNode networkNode, Set seedNodeAddresses, File storageDir) { + public PeerManager(NetworkNode networkNode, Set seedNodeAddresses, File storageDir, Clock clock) { this.networkNode = networkNode; + this.clock = clock; this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); networkNode.addConnectionListener(this); dbStorage = new Storage<>(storageDir); @@ -89,12 +112,33 @@ public class PeerManager implements ConnectionListener, MessageListener { checkMaxConnections(MAX_CONNECTIONS); }, 3); }; + + // we check if app was idle for more then 5 sec. + listener = new Clock.Listener() { + @Override + public void onSecondTick() { + } + + @Override + public void onMinuteTick() { + } + + @Override + public void onMissedSecondTick(long missed) { + if (missed > Clock.IDLE_TOLERANCE) { + log.error("We have been idle for {} sec", missed / 1000); + listeners.stream().forEach(Listener::onAwakeFromStandby); + } + } + }; + clock.addListener(listener); } public void shutDown() { Log.traceCall(); networkNode.removeConnectionListener(this); + clock.removeListener(listener); stopCheckMaxConnectionsTimer(); } @@ -102,6 +146,14 @@ public class PeerManager implements ConnectionListener, MessageListener { return MAX_CONNECTIONS_ABSOLUTE; } + public void addListener(Listener listener) { + listeners.add(listener); + } + + public void removeListener(Listener listener) { + listeners.remove(listener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -118,12 +170,21 @@ public class PeerManager implements ConnectionListener, MessageListener { seedNodeAddresses.contains(peersNodeAddressOptional.get())) { connection.setPeerType(Connection.PeerType.SEED_NODE); } + + if (lostAllConnections) { + lostAllConnections = false; + listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost); + } } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); handleConnectionFault(connection); + + lostAllConnections = networkNode.getAllConnections().isEmpty(); + if (lostAllConnections) + listeners.stream().forEach(Listener::onAllConnectionsLost); } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java new file mode 100644 index 0000000000..b0ea0cb39b --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/GetDataRequestHandler.java @@ -0,0 +1,120 @@ +package io.bitsquare.p2p.peers.getdata; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.app.Log; +import io.bitsquare.common.Timer; +import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.network.CloseConnectionReason; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peers.PeerManager; +import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; +import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse; +import io.bitsquare.p2p.storage.P2PDataStorage; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; + +public class GetDataRequestHandler { + private static final Logger log = LoggerFactory.getLogger(GetDataRequestHandler.class); + + + private static final long TIME_OUT_SEC = 20; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listener + /////////////////////////////////////////////////////////////////////////////////////////// + + public interface Listener { + void onComplete(); + + void onFault(String errorMessage, Connection connection); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Class fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final NetworkNode networkNode; + private final PeerManager peerManager; + private P2PDataStorage dataStorage; + private final Listener listener; + private Timer timeoutTimer; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public GetDataRequestHandler(NetworkNode networkNode, PeerManager peerManager, P2PDataStorage dataStorage, Listener listener) { + this.networkNode = networkNode; + this.peerManager = peerManager; + this.dataStorage = dataStorage; + this.listener = listener; + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void handle(GetDataRequest getDataRequest, final Connection connection) { + Log.traceCall(getDataRequest + "\n\tconnection=" + connection); + GetDataResponse getDataResponse = new GetDataResponse(new HashSet<>(dataStorage.getMap().values()), + getDataRequest.getNonce()); + SettableFuture future = networkNode.sendMessage(connection, getDataResponse); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Send DataResponse to {} succeeded. getDataResponse={}", + connection.getPeersNodeAddressOptional(), getDataResponse); + cleanup(); + listener.onComplete(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending getDataRequest to " + connection + + " failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." + + "Exception: " + throwable.getMessage(); + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection); + } + }); + + checkArgument(timeoutTimer == null, "requestData must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse + + " on connection:" + connection; + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection); + }, + TIME_OUT_SEC, TimeUnit.SECONDS); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) { + log.info(errorMessage); + peerManager.shutDownConnection(connection, closeConnectionReason); + cleanup(); + listener.onFault(errorMessage, connection); + } + + private void cleanup() { + if (timeoutTimer != null) { + timeoutTimer.stop(); + timeoutTimer = null; + } + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java new file mode 100644 index 0000000000..2a8f3b3416 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -0,0 +1,175 @@ +package io.bitsquare.p2p.peers.getdata; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.app.Log; +import io.bitsquare.common.Timer; +import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.CloseConnectionReason; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.MessageListener; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peers.PeerManager; +import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; +import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse; +import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest; +import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest; +import io.bitsquare.p2p.storage.P2PDataStorage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +import static com.google.common.base.Preconditions.checkArgument; + +public class RequestDataHandler implements MessageListener { + private static final Logger log = LoggerFactory.getLogger(RequestDataHandler.class); + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listener + /////////////////////////////////////////////////////////////////////////////////////////// + + public interface Listener { + void onComplete(); + + void onFault(String errorMessage, @Nullable Connection connection); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Class fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final NetworkNode networkNode; + private final P2PDataStorage dataStorage; + private final PeerManager peerManager; + private final Listener listener; + private Timer timeoutTimer; + private final long nonce = new Random().nextLong(); + private boolean stopped; + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public RequestDataHandler(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, + Listener listener) { + this.networkNode = networkNode; + this.dataStorage = dataStorage; + this.peerManager = peerManager; + this.listener = listener; + + networkNode.addMessageListener(this); + } + + public void cleanup() { + Log.traceCall(); + stopped = true; + networkNode.removeMessageListener(this); + stopTimeoutTimer(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void requestData(NodeAddress nodeAddress) { + Log.traceCall("nodeAddress=" + nodeAddress); + if (!stopped) { + GetDataRequest getDataRequest; + if (networkNode.getNodeAddress() == null) + getDataRequest = new PreliminaryGetDataRequest(nonce); + else + getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce); + + log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress); + + SettableFuture future = networkNode.sendMessage(nodeAddress, getDataRequest); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.trace("Send " + getDataRequest + " to " + nodeAddress + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending getDataRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\t" + + "getDataRequest=" + getDataRequest + "." + + "\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); + } + }); + + checkArgument(timeoutTimer == null, "requestData must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest + + " on nodeAddress:" + nodeAddress; + log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); + }, + 10); + } else { + log.warn("We have stopped already. We ignore that requestData call."); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + if (message instanceof GetDataResponse) { + Log.traceCall(message.toString() + "\n\tconnection=" + connection); + if (!stopped) { + GetDataResponse getDataResponse = (GetDataResponse) message; + if (getDataResponse.requestNonce == nonce) { + stopTimeoutTimer(); + checkArgument(connection.getPeersNodeAddressOptional().isPresent(), + "RequestDataHandler.onMessage: connection.getPeersNodeAddressOptional() must be present " + + "at that moment"); + ((GetDataResponse) message).dataSet.stream() + .forEach(protectedData -> dataStorage.add(protectedData, + connection.getPeersNodeAddressOptional().get())); + + cleanup(); + listener.onComplete(); + } else { + log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " + + "handshake (timeout causes connection close but peer might have sent a msg before " + + "connection was closed).\n\t" + + "We drop that message. nonce={} / requestNonce={}", + nonce, getDataResponse.requestNonce); + } + } else { + log.warn("We have stopped already. We ignore that onDataRequest call."); + } + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void stopTimeoutTimer() { + if (timeoutTimer != null) { + timeoutTimer.stop(); + timeoutTimer = null; + } + } + + private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { + cleanup(); + peerManager.shutDownConnection(nodeAddress, closeConnectionReason); + listener.onFault(errorMessage, null); + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandshake.java deleted file mode 100644 index 1a70bc49e2..0000000000 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandshake.java +++ /dev/null @@ -1,207 +0,0 @@ -package io.bitsquare.p2p.peers.getdata; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; -import io.bitsquare.app.Log; -import io.bitsquare.common.Timer; -import io.bitsquare.common.UserThread; -import io.bitsquare.p2p.Message; -import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.CloseConnectionReason; -import io.bitsquare.p2p.network.Connection; -import io.bitsquare.p2p.network.MessageListener; -import io.bitsquare.p2p.network.NetworkNode; -import io.bitsquare.p2p.peers.PeerManager; -import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; -import io.bitsquare.p2p.peers.getdata.messages.GetDataResponse; -import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest; -import io.bitsquare.p2p.peers.getdata.messages.PreliminaryGetDataRequest; -import io.bitsquare.p2p.storage.P2PDataStorage; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkArgument; - -public class RequestDataHandshake implements MessageListener { - private static final Logger log = LoggerFactory.getLogger(RequestDataHandshake.class); - - /////////////////////////////////////////////////////////////////////////////////////////// - // Listener - /////////////////////////////////////////////////////////////////////////////////////////// - - public interface Listener { - void onComplete(); - - void onFault(String errorMessage, @Nullable Connection connection); - } - - - /////////////////////////////////////////////////////////////////////////////////////////// - // Class fields - /////////////////////////////////////////////////////////////////////////////////////////// - - private final NetworkNode networkNode; - private final P2PDataStorage dataStorage; - private final PeerManager peerManager; - private final Listener listener; - private Timer timeoutTimer; - private final long nonce = new Random().nextLong(); - - - /////////////////////////////////////////////////////////////////////////////////////////// - // Constructor - /////////////////////////////////////////////////////////////////////////////////////////// - - public RequestDataHandshake(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, - Listener listener) { - this.networkNode = networkNode; - this.dataStorage = dataStorage; - this.peerManager = peerManager; - this.listener = listener; - - networkNode.addMessageListener(this); - } - - public void shutDown() { - Log.traceCall(); - networkNode.removeMessageListener(this); - stopTimeoutTimer(); - } - - - /////////////////////////////////////////////////////////////////////////////////////////// - // API - /////////////////////////////////////////////////////////////////////////////////////////// - - public void requestData(NodeAddress nodeAddress) { - Log.traceCall("nodeAddress=" + nodeAddress); - GetDataRequest getDataRequest; - if (networkNode.getNodeAddress() == null) - getDataRequest = new PreliminaryGetDataRequest(nonce); - else - getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce); - - log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress); - - SettableFuture future = networkNode.sendMessage(nodeAddress, getDataRequest); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.trace("Send " + getDataRequest + " to " + nodeAddress + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending getDataRequest to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\t" + - "getDataRequest=" + getDataRequest + "." + - "\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); - shutDown(); - listener.onFault(errorMessage, null); - } - }); - - checkArgument(timeoutTimer == null, "requestData must not be called twice."); - timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest + - " on nodeAddress:" + nodeAddress; - log.info(errorMessage + " / RequestDataHandshake=" + - RequestDataHandshake.this); - peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); - shutDown(); - listener.onFault(errorMessage, null); - }, - 10); - } - - public void onDataRequest(Message message, final Connection connection) { - Log.traceCall(message.toString() + "\n\tconnection=" + connection); - - GetDataResponse getDataResponse = new GetDataResponse(new HashSet<>(dataStorage.getMap().values()), - ((GetDataRequest) message).getNonce()); - SettableFuture future = networkNode.sendMessage(connection, getDataResponse); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Send DataResponse to {} succeeded. getDataResponse={}", - connection.getPeersNodeAddressOptional(), getDataResponse); - shutDown(); - listener.onComplete(); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending getDataRequest to " + connection + - " failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." + - "Exception: " + throwable.getMessage(); - log.info(errorMessage); - - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); - shutDown(); - listener.onFault(errorMessage, connection); - } - }); - - checkArgument(timeoutTimer == null, "requestData must not be called twice."); - timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse + - " on connection:" + connection; - log.info(errorMessage + " / RequestDataHandshake=" + - RequestDataHandshake.this); - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT); - shutDown(); - listener.onFault(errorMessage, connection); - }, - 10, TimeUnit.SECONDS); - } - - /////////////////////////////////////////////////////////////////////////////////////////// - // MessageListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onMessage(Message message, Connection connection) { - if (message instanceof GetDataResponse) { - Log.traceCall(message.toString() + "\n\tconnection=" + connection); - GetDataResponse getDataResponse = (GetDataResponse) message; - if (getDataResponse.requestNonce == nonce) { - stopTimeoutTimer(); - checkArgument(connection.getPeersNodeAddressOptional().isPresent(), - "RequestDataHandshake.onMessage: connection.getPeersNodeAddressOptional() must be present " + - "at that moment"); - ((GetDataResponse) message).dataSet.stream() - .forEach(protectedData -> dataStorage.add(protectedData, - connection.getPeersNodeAddressOptional().get())); - - shutDown(); - listener.onComplete(); - } else { - log.debug("Nonce not matching. That can happen rarely if we get a response after a canceled " + - "handshake (timeout causes connection close but peer might have sent a msg before " + - "connection was closed).\n\t" + - "We drop that message. nonce={} / requestNonce={}", - nonce, getDataResponse.requestNonce); - } - } - } - - /////////////////////////////////////////////////////////////////////////////////////////// - // Private - /////////////////////////////////////////////////////////////////////////////////////////// - - private void stopTimeoutTimer() { - if (timeoutTimer != null) { - timeoutTimer.stop(); - timeoutTimer = null; - } - } -} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java index ae05c83dbe..ead910f860 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java @@ -5,9 +5,7 @@ import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.Connection; -import io.bitsquare.p2p.network.MessageListener; -import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.getdata.messages.GetDataRequest; import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; @@ -17,12 +15,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; -public class RequestDataManager implements MessageListener { +public class RequestDataManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); private static final long RETRY_DELAY_SEC = 10; @@ -55,11 +52,11 @@ public class RequestDataManager implements MessageListener { private final Collection seedNodeAddresses; private final Listener listener; - private final Map requestDataHandshakeMap = new HashMap<>(); - private Optional nodeOfPreliminaryDataRequest = Optional.empty(); - private Timer requestDataTimer; + private final Map requestDataHandlerMap = new HashMap<>(); + private Optional nodeAddressOfPreliminaryDataRequest = Optional.empty(); + private Timer retryTimer; private boolean dataUpdateRequested; - private boolean shutDownInProgress; + private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// @@ -76,14 +73,16 @@ public class RequestDataManager implements MessageListener { checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty."); networkNode.addMessageListener(this); + peerManager.addListener(this); } public void shutDown() { Log.traceCall(); - shutDownInProgress = true; - stopRequestDataTimer(); + stopped = true; + stopRetryTimer(); networkNode.removeMessageListener(this); - requestDataHandshakeMap.values().stream().forEach(RequestDataHandshake::shutDown); + peerManager.removeListener(this); + requestDataHandlerMap.values().stream().forEach(RequestDataHandler::cleanup); } @@ -100,19 +99,73 @@ public class RequestDataManager implements MessageListener { requestData(nextCandidate, nodeAddresses); } - public void requestUpdatesData() { + public void requestUpdateData() { Log.traceCall(); - checkArgument(nodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); + checkArgument(nodeAddressOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); dataUpdateRequested = true; List remainingNodeAddresses = new ArrayList<>(seedNodeAddresses); Collections.shuffle(remainingNodeAddresses); - NodeAddress candidate = nodeOfPreliminaryDataRequest.get(); + NodeAddress candidate = nodeAddressOfPreliminaryDataRequest.get(); remainingNodeAddresses.remove(candidate); requestData(candidate, remainingNodeAddresses); } - public Optional getNodeOfPreliminaryDataRequest() { - return nodeOfPreliminaryDataRequest; + public Optional getNodeAddressOfPreliminaryDataRequest() { + return nodeAddressOfPreliminaryDataRequest; + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + Log.traceCall(); + // clean up in case we could not clean up at disconnect + closeRequestDataHandler(connection); + } + + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + Log.traceCall(); + closeRequestDataHandler(connection); + } + + @Override + public void onError(Throwable throwable) { + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // PeerManager.Listener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onAllConnectionsLost() { + Log.traceCall(); + closeAllRequestDataHandlers(); + stopRetryTimer(); + stopped = true; + } + + @Override + public void onNewConnectionAfterAllConnectionsLost() { + Log.traceCall(); + closeAllRequestDataHandlers(); + stopped = false; + + retryAfterDelay(); + } + + @Override + public void onAwakeFromStandby() { + Log.traceCall(); + closeAllRequestDataHandlers(); + stopped = false; + + if (!networkNode.getAllConnections().isEmpty()) + retryAfterDelay(); } @@ -124,124 +177,147 @@ public class RequestDataManager implements MessageListener { public void onMessage(Message message, Connection connection) { if (message instanceof GetDataRequest) { Log.traceCall(message.toString() + "\n\tconnection=" + connection); - RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager, - new RequestDataHandshake.Listener() { - @Override - public void onComplete() { - log.trace("requestDataHandshake of inbound connection complete.\n\tConnection={}", - connection); - } + if (!stopped) { + if (peerManager.isSeedNode(connection)) + connection.setPeerType(Connection.PeerType.SEED_NODE); - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - log.trace("requestDataHandshake of inbound connection failed.\n\tConnection={}\n\t" + - "ErrorMessage={}", connection, errorMessage); - peerManager.handleConnectionFault(connection); - } - }); - requestDataHandshake.onDataRequest(message, connection); + GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, peerManager, dataStorage, + new GetDataRequestHandler.Listener() { + @Override + public void onComplete() { + log.trace("requestDataHandshake completed.\n\tConnection={}", + connection); + } + + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + + "ErrorMessage={}", connection, errorMessage); + peerManager.handleConnectionFault(connection); + } + }); + getDataRequestHandler.handle((GetDataRequest) message, connection); + } else { + log.warn("We have stopped already. We ignore that onMessage call."); + } } } - /////////////////////////////////////////////////////////////////////////////////////////// - // Private + // RequestData /////////////////////////////////////////////////////////////////////////////////////////// private void requestData(NodeAddress nodeAddress, List remainingNodeAddresses) { Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); - if (!requestDataHandshakeMap.containsKey(nodeAddress)) { - RequestDataHandshake requestDataHandshake = new RequestDataHandshake(networkNode, dataStorage, peerManager, - new RequestDataHandshake.Listener() { - @Override - public void onComplete() { - log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}", - nodeAddress); - stopRequestDataTimer(); + if (!stopped) { + if (!requestDataHandlerMap.containsKey(nodeAddress)) { + RequestDataHandler requestDataHandler = new RequestDataHandler(networkNode, dataStorage, peerManager, + new RequestDataHandler.Listener() { + @Override + public void onComplete() { + log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}", + nodeAddress); + stopRetryTimer(); - // need to remove before listeners are notified as they cause the update call - requestDataHandshakeMap.remove(nodeAddress); + // need to remove before listeners are notified as they cause the update call + requestDataHandlerMap.remove(nodeAddress); - // 1. We get a response from requestPreliminaryData - if (!nodeOfPreliminaryDataRequest.isPresent()) { - nodeOfPreliminaryDataRequest = Optional.of(nodeAddress); - listener.onPreliminaryDataReceived(); + // 1. We get a response from requestPreliminaryData + if (!nodeAddressOfPreliminaryDataRequest.isPresent()) { + nodeAddressOfPreliminaryDataRequest = Optional.of(nodeAddress); + listener.onPreliminaryDataReceived(); + } + + // 2. Later we get a response from requestUpdatesData + if (dataUpdateRequested) { + dataUpdateRequested = false; + listener.onUpdatedDataReceived(); + } + + listener.onDataReceived(); } - // 2. Later we get a response from requestUpdatesData - if (dataUpdateRequested) { - dataUpdateRequested = false; - listener.onUpdatedDataReceived(); - } + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" + + "ErrorMessage={}", nodeAddress, errorMessage); - listener.onDataReceived(); - } + requestDataHandlerMap.remove(nodeAddress); + peerManager.handleConnectionFault(nodeAddress, connection); - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" + - "ErrorMessage={}", nodeAddress, errorMessage); + if (!stopped) { + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting data. " + + "We will try requestDataFromPeers again."); + NodeAddress nextCandidate = remainingNodeAddresses.get(0); + remainingNodeAddresses.remove(nextCandidate); + requestData(nextCandidate, remainingNodeAddresses); + } else { + log.info("There is no remaining node available for requesting data. " + + "That is expected if no other node is online.\n\t" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request data from our seed nodes after a random pause."); - peerManager.handleConnectionFault(nodeAddress, connection); + // Notify listeners + if (!nodeAddressOfPreliminaryDataRequest.isPresent()) { + if (peerManager.isSeedNode(nodeAddress)) + listener.onNoSeedNodeAvailable(); + else + listener.onNoPeersAvailable(); + } - if (!shutDownInProgress) { - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are remaining nodes available for requesting data. " + - "We will try requestDataFromPeers again."); - NodeAddress nextCandidate = remainingNodeAddresses.get(0); - remainingNodeAddresses.remove(nextCandidate); - requestData(nextCandidate, remainingNodeAddresses); + retryAfterDelay(); + } } else { - log.info("There is no remaining node available for requesting data. " + - "That is expected if no other node is online.\n\t" + - "We will try to use reported peers (if no available we use persisted peers) " + - "and try again to request data from our seed nodes after a random pause."); - - // try again after a pause - stopRequestDataTimer(); - requestDataTimer = UserThread.runAfter(() -> { - log.trace("requestDataAfterDelayTimer called"); - // We want to keep it sorted but avoid duplicates - // We don't filter out already established connections for seed nodes as it might be that - // we got from the other seed node contacted but we still have not requested the initial - // data set - List list = new ArrayList<>(seedNodeAddresses); - Collections.shuffle(list); - list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list)); - list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); - checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty."); - NodeAddress nextCandidate = list.get(0); - list.remove(nextCandidate); - requestData(nextCandidate, list); - }, - RETRY_DELAY_SEC, TimeUnit.SECONDS); - } - - requestDataHandshakeMap.remove(nodeAddress); - - // Notify listeners - if (!nodeOfPreliminaryDataRequest.isPresent()) { - if (peerManager.isSeedNode(nodeAddress)) - listener.onNoSeedNodeAvailable(); - else - listener.onNoPeersAvailable(); + log.warn("We have stopped already. We ignore that requestData.onFault call."); } } - } - }); - requestDataHandshakeMap.put(nodeAddress, requestDataHandshake); - requestDataHandshake.requestData(nodeAddress); + }); + requestDataHandlerMap.put(nodeAddress, requestDataHandler); + requestDataHandler.requestData(nodeAddress); + } else { + log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress); + } } else { - log.warn("We have started already a requestDataHandshake to peer. nodeAddress=" + nodeAddress); + log.warn("We have stopped already. We ignore that requestData call."); } } - // sorted by most recent lastActivityDate - private List getFilteredAndSortedList(Set set, List list) { - return set.stream() - .filter(e -> !list.contains(e.nodeAddress) && - !peerManager.isSeedNode(e) && - !peerManager.isSelf(e)) + + /////////////////////////////////////////////////////////////////////////////////////////// + // Utils + /////////////////////////////////////////////////////////////////////////////////////////// + + private void retryAfterDelay() { + if (retryTimer == null) { + retryTimer = UserThread.runAfter(() -> { + log.trace("retryTimer called"); + stopRetryTimer(); + + // We create a new list of candidates + // 1. shuffled seedNodes + // 2. reported peers sorted by last activity date + // 3. Add as last persisted peers sorted by last activity date + List list = getFilteredList(new ArrayList<>(seedNodeAddresses), new ArrayList<>()); + Collections.shuffle(list); + + List filteredReportedPeers = getFilteredNonSeedNodeList(getSortedNodeAddresses(peerManager.getReportedPeers()), list); + list.addAll(filteredReportedPeers); + + List filteredPersistedPeers = getFilteredNonSeedNodeList(getSortedNodeAddresses(peerManager.getPersistedPeers()), list); + list.addAll(filteredPersistedPeers); + + checkArgument(!list.isEmpty(), "seedNodeAddresses must not be empty."); + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestData(nextCandidate, list); + }, + RETRY_DELAY_SEC); + } + } + + private List getSortedNodeAddresses(Collection collection) { + return collection.stream() .collect(Collectors.toList()) .stream() .sorted((o1, o2) -> o2.date.compareTo(o1.date)) @@ -249,10 +325,37 @@ public class RequestDataManager implements MessageListener { .collect(Collectors.toList()); } - private void stopRequestDataTimer() { - if (requestDataTimer != null) { - requestDataTimer.stop(); - requestDataTimer = null; + private List getFilteredList(Collection collection, List list) { + return collection.stream() + .filter(e -> !list.contains(e) && + !peerManager.isSelf(e)) + .collect(Collectors.toList()); + } + + private List getFilteredNonSeedNodeList(Collection collection, List list) { + return getFilteredList(collection, list).stream() + .filter(e -> !peerManager.isSeedNode(e)) + .collect(Collectors.toList()); + } + + private void stopRetryTimer() { + if (retryTimer != null) { + retryTimer.stop(); + retryTimer = null; } } + + private void closeRequestDataHandler(Connection connection) { + if (connection.getPeersNodeAddressOptional().isPresent()) { + NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); + requestDataHandlerMap.get(nodeAddress).cleanup(); + requestDataHandlerMap.remove(nodeAddress); + } + } + + private void closeAllRequestDataHandlers() { + requestDataHandlerMap.values().stream().forEach(RequestDataHandler::cleanup); + requestDataHandlerMap.clear(); + } + } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java index 9c1771460d..9f28065dfd 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java @@ -5,7 +5,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.p2p.Message; -import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.CloseConnectionReason; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.MessageListener; @@ -23,9 +22,6 @@ import java.util.Random; class KeepAliveHandler implements MessageListener { private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class); - @Nullable - private Connection connection; - /////////////////////////////////////////////////////////////////////////////////////////// // Listener @@ -34,9 +30,7 @@ class KeepAliveHandler implements MessageListener { public interface Listener { void onComplete(); - void onFault(String errorMessage, Connection connection); - - void onFault(String errorMessage, NodeAddress nodeAddress); + void onFault(String errorMessage); } @@ -48,6 +42,9 @@ class KeepAliveHandler implements MessageListener { private final PeerManager peerManager; private final Listener listener; private final int nonce = new Random().nextInt(); + @Nullable + private Connection connection; + private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// @@ -61,6 +58,7 @@ class KeepAliveHandler implements MessageListener { } public void cleanup() { + stopped = true; if (connection != null) connection.removeMessageListener(this); } @@ -70,58 +68,33 @@ class KeepAliveHandler implements MessageListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void sendPing(Connection connection) { Log.traceCall("connection=" + connection + " / this=" + this); - this.connection = connection; - connection.addMessageListener(this); - Ping ping = new Ping(nonce); - SettableFuture future = networkNode.sendMessage(connection, ping); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Send " + ping + " to " + connection + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending ping to " + connection + - " failed. That is expected if the peer is offline.\n\tping=" + ping + - ".\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - cleanup(); - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); - listener.onFault(errorMessage, connection); - } - }); - } - - public void sendPing(NodeAddress nodeAddress) { - Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); - Ping ping = new Ping(nonce); - SettableFuture future = networkNode.sendMessage(nodeAddress, ping); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - if (connection != null) { - KeepAliveHandler.this.connection = connection; - connection.addMessageListener(KeepAliveHandler.this); + if (!stopped) { + this.connection = connection; + connection.addMessageListener(this); + Ping ping = new Ping(nonce); + SettableFuture future = networkNode.sendMessage(connection, ping); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Send " + ping + " to " + connection + " succeeded."); } - log.trace("Send " + ping + " to " + nodeAddress + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending ping to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\tping=" + ping + - ".\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - cleanup(); - peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); - listener.onFault(errorMessage, nodeAddress); - } - }); + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending ping to " + connection + + " failed. That is expected if the peer is offline.\n\tping=" + ping + + ".\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + cleanup(); + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + listener.onFault(errorMessage); + } + }); + } else { + log.warn("We have stopped already. We ignore that sendPing call."); + } } @@ -133,16 +106,19 @@ class KeepAliveHandler implements MessageListener { public void onMessage(Message message, Connection connection) { if (message instanceof Pong) { Log.traceCall(message.toString() + "\n\tconnection=" + connection); - Pong pong = (Pong) message; - if (pong.requestNonce == nonce) { - cleanup(); - listener.onComplete(); + if (!stopped) { + Pong pong = (Pong) message; + if (pong.requestNonce == nonce) { + cleanup(); + listener.onComplete(); + } else { + log.warn("Nonce not matching. That should never happen.\n\t" + + "We drop that message. nonce={} / requestNonce={}", + nonce, pong.requestNonce); + } } else { - log.warn("Nonce not matching. That should never happen.\n\t" + - "We drop that message. nonce={} / requestNonce={}", - nonce, pong.requestNonce); + log.warn("We have stopped already. We ignore that onMessage call."); } } } - } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java index 7fc1244a46..38149b91c0 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java @@ -2,13 +2,11 @@ package io.bitsquare.p2p.peers.keepalive; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; +import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Message; -import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.keepalive.messages.Ping; @@ -19,10 +17,8 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -public class KeepAliveManager implements MessageListener, ConnectionListener { +public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class); //private static final int INTERVAL_SEC = new Random().nextInt(10) + 10; @@ -31,9 +27,9 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { private final NetworkNode networkNode; private final PeerManager peerManager; - private ScheduledThreadPoolExecutor executor; private final Map maintenanceHandlerMap = new HashMap<>(); - private boolean shutDownInProgress; + private boolean stopped; + private Timer keepAliveTimer; /////////////////////////////////////////////////////////////////////////////////////////// @@ -46,18 +42,20 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { networkNode.addMessageListener(this); networkNode.addConnectionListener(this); + peerManager.addListener(this); } public void shutDown() { Log.traceCall(); - shutDownInProgress = true; + stopped = true; networkNode.removeMessageListener(this); networkNode.removeConnectionListener(this); - maintenanceHandlerMap.values().stream().forEach(KeepAliveHandler::cleanup); + peerManager.removeListener(this); - if (executor != null) - MoreExecutors.shutdownAndAwaitTermination(executor, 100, TimeUnit.MILLISECONDS); + closeAllMaintenanceHandlers(); + + stopKeepAliveTimer(); } @@ -66,11 +64,9 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { /////////////////////////////////////////////////////////////////////////////////////////// public void start() { - if (executor == null) { - executor = Utilities.getScheduledThreadPoolExecutor("KeepAliveManager", 1, 2, 5); - executor.scheduleAtFixedRate(() -> UserThread.execute(this::keepAlive), - INTERVAL_SEC, INTERVAL_SEC, TimeUnit.SECONDS); - } + stopped = false; + if (keepAliveTimer == null) + keepAliveTimer = UserThread.runPeriodically(this::keepAlive, INTERVAL_SEC); } @@ -82,26 +78,29 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { public void onMessage(Message message, Connection connection) { if (message instanceof Ping) { Log.traceCall(message.toString() + "\n\tconnection=" + connection); + if (!stopped) { + Ping ping = (Ping) message; + Pong pong = new Pong(ping.nonce); + SettableFuture future = networkNode.sendMessage(connection, pong); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Pong sent successfully"); + } - Ping ping = (Ping) message; - Pong pong = new Pong(ping.nonce); - SettableFuture future = networkNode.sendMessage(connection, pong); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Pong sent successfully"); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending pong to " + connection + - " failed. That is expected if the peer is offline. pong=" + pong + "." + - "Exception: " + throwable.getMessage(); - log.info(errorMessage); - peerManager.handleConnectionFault(connection); - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); - } - }); + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending pong to " + connection + + " failed. That is expected if the peer is offline. pong=" + pong + "." + + "Exception: " + throwable.getMessage(); + log.info(errorMessage); + peerManager.handleConnectionFault(connection); + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + } + }); + } else { + log.warn("We have stopped already. We ignore that onMessage call."); + } } } @@ -112,15 +111,15 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { @Override public void onConnection(Connection connection) { + Log.traceCall(); // clean up in case we could not clean up at disconnect - if (connection.getPeersNodeAddressOptional().isPresent()) - maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress()); + closeMaintenanceHandler(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - if (connection.getPeersNodeAddressOptional().isPresent()) - maintenanceHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress()); + Log.traceCall(); + closeMaintenanceHandler(connection); } @Override @@ -128,58 +127,92 @@ public class KeepAliveManager implements MessageListener, ConnectionListener { } + /////////////////////////////////////////////////////////////////////////////////////////// + // PeerManager.Listener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onAllConnectionsLost() { + Log.traceCall(); + closeAllMaintenanceHandlers(); + stopKeepAliveTimer(); + } + + @Override + public void onNewConnectionAfterAllConnectionsLost() { + Log.traceCall(); + closeAllMaintenanceHandlers(); + start(); + } + + @Override + public void onAwakeFromStandby() { + Log.traceCall(); + closeAllMaintenanceHandlers(); + if (!networkNode.getAllConnections().isEmpty()) + start(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void keepAlive() { - Log.traceCall(); - - if (!shutDownInProgress) { + if (!stopped) { + Log.traceCall(); networkNode.getConfirmedConnections().stream() .filter(connection -> connection instanceof OutboundConnection) .forEach(connection -> { - if (!maintenanceHandlerMap.containsKey(getKey(connection))) { + final String uid = connection.getUid(); + if (!maintenanceHandlerMap.containsKey(uid)) { KeepAliveHandler keepAliveHandler = new KeepAliveHandler(networkNode, peerManager, new KeepAliveHandler.Listener() { @Override public void onComplete() { - maintenanceHandlerMap.remove(getKey(connection)); + maintenanceHandlerMap.remove(uid); } @Override - public void onFault(String errorMessage, Connection connection) { - maintenanceHandlerMap.remove(getKey(connection)); - } - - @Override - public void onFault(String errorMessage, NodeAddress nodeAddress) { - maintenanceHandlerMap.remove(nodeAddress.getFullAddress()); + public void onFault(String errorMessage) { + maintenanceHandlerMap.remove(uid); } }); - maintenanceHandlerMap.put(getKey(connection), keepAliveHandler); + maintenanceHandlerMap.put(uid, keepAliveHandler); keepAliveHandler.sendPing(connection); } else { log.warn("Connection with id {} has not completed and is still in our map. " + - "We will try to ping that peer at the next schedule.", getKey(connection)); + "We will try to ping that peer at the next schedule.", uid); } }); int size = maintenanceHandlerMap.size(); log.info("maintenanceHandlerMap size=" + size); if (size > peerManager.getMaxConnections()) - log.warn("Seems we don't clean up out map correctly.\n" + + log.warn("Seems we didn't clean up out map correctly.\n" + "maintenanceHandlerMap size={}, peerManager.getMaxConnections()={}", size, peerManager.getMaxConnections()); + } else { + log.warn("We have stopped already. We ignore that keepAlive call."); } } - private String getKey(Connection connection) { + private void closeMaintenanceHandler(Connection connection) { if (connection.getPeersNodeAddressOptional().isPresent()) { - return connection.getPeersNodeAddressOptional().get().getFullAddress(); - } else { - // TODO not sure if that can be the case, but handle it otherwise we get an exception - log.warn("!connection.getPeersNodeAddressOptional().isPresent(). That should not happen."); - return "null"; + String uid = connection.getUid(); + maintenanceHandlerMap.get(uid).cleanup(); + maintenanceHandlerMap.remove(uid); + } + } + + private void closeAllMaintenanceHandlers() { + maintenanceHandlerMap.values().stream().forEach(KeepAliveHandler::cleanup); + maintenanceHandlerMap.clear(); + } + + private void stopKeepAliveTimer() { + stopped = true; + if (keepAliveTimer != null) { + keepAliveTimer.stop(); + keepAliveTimer = null; } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java index 91ba0cd33f..9ab99fa1de 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java @@ -107,10 +107,10 @@ class GetPeersRequestHandler { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void handleFault(String errorMessage, CloseConnectionReason sendMsgFailure, Connection connection) { + private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) { // TODO retry cleanup(); - peerManager.shutDownConnection(connection, sendMsgFailure); + peerManager.shutDownConnection(connection, closeConnectionReason); listener.onFault(errorMessage, connection); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java index 47ca3ed390..49d28e1705 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -52,6 +52,7 @@ class PeerExchangeHandler implements MessageListener { private final int nonce = new Random().nextInt(); private Timer timeoutTimer; private Connection connection; + private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// @@ -65,6 +66,7 @@ class PeerExchangeHandler implements MessageListener { } public void cleanup() { + stopped = true; if (connection != null) connection.removeMessageListener(this); @@ -81,43 +83,47 @@ class PeerExchangeHandler implements MessageListener { public void sendGetPeersRequest(NodeAddress nodeAddress) { Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); - if (networkNode.getNodeAddress() != null) { - GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress)); - SettableFuture future = networkNode.sendMessage(nodeAddress, getPeersRequest); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - if (!connection.getPeersNodeAddressOptional().isPresent()) { - connection.setPeersNodeAddress(nodeAddress); - //TODO remove setPeersNodeAddress if never needed - log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()"); + if (!stopped) { + if (networkNode.getNodeAddress() != null) { + GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress)); + SettableFuture future = networkNode.sendMessage(nodeAddress, getPeersRequest); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + if (!connection.getPeersNodeAddressOptional().isPresent()) { + connection.setPeersNodeAddress(nodeAddress); + //TODO remove setPeersNodeAddress if never needed + log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()"); + } + PeerExchangeHandler.this.connection = connection; + connection.addMessageListener(PeerExchangeHandler.this); + log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded."); } - PeerExchangeHandler.this.connection = connection; - connection.addMessageListener(PeerExchangeHandler.this); - log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded."); - } - @Override - public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending getPeersRequest to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest + - ".\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress); - } - }); + @Override + public void onFailure(@NotNull Throwable throwable) { + String errorMessage = "Sending getPeersRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest + + ".\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress); + } + }); - checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice."); - timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress; - log.info(errorMessage + " / PeerExchangeHandler=" + - PeerExchangeHandler.this); - log.info("timeoutTimer called on " + this); - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress); - }, - TIME_OUT_SEC, TimeUnit.SECONDS); + checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice."); + timeoutTimer = UserThread.runAfter(() -> { + String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress; + log.info(errorMessage + " / PeerExchangeHandler=" + + PeerExchangeHandler.this); + log.info("timeoutTimer called on " + this); + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress); + }, + TIME_OUT_SEC, TimeUnit.SECONDS); + } else { + log.warn("My node address is still null at sendGetPeersRequest. We ignore that call."); + } } else { - log.trace("My node address is still null at sendGetPeersRequest. We ignore that call."); + log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest call."); } } @@ -128,23 +134,24 @@ class PeerExchangeHandler implements MessageListener { @Override public void onMessage(Message message, Connection connection) { if (message instanceof GetPeersResponse) { - GetPeersResponse getPeersResponse = (GetPeersResponse) message; - - if (peerManager.isSeedNode(connection)) - connection.setPeerType(Connection.PeerType.SEED_NODE); - - // Check if the response is for our request - if (getPeersResponse.requestNonce == nonce) { + if (!stopped) { Log.traceCall(message.toString() + "\n\tconnection=" + connection); - Log.traceCall("this=" + this); - peerManager.addToReportedPeers(getPeersResponse.reportedPeers, connection); + GetPeersResponse getPeersResponse = (GetPeersResponse) message; + if (peerManager.isSeedNode(connection)) + connection.setPeerType(Connection.PeerType.SEED_NODE); - cleanup(); - listener.onComplete(); + // Check if the response is for our request + if (getPeersResponse.requestNonce == nonce) { + peerManager.addToReportedPeers(getPeersResponse.reportedPeers, connection); + cleanup(); + listener.onComplete(); + } else { + log.warn("Nonce not matching. That should never happen.\n\t" + + "We drop that message. nonce={} / requestNonce={}", + nonce, getPeersResponse.requestNonce); + } } else { - log.warn("Nonce not matching. That should never happen.\n\t" + - "We drop that message. nonce={} / requestNonce={}", - nonce, getPeersResponse.requestNonce); + log.warn("We have stopped that handler already. We ignore that onMessage call."); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java index cf5ac8465c..7e6e48a461 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java @@ -1,10 +1,8 @@ package io.bitsquare.p2p.peers.peerexchange; -import com.google.common.util.concurrent.MoreExecutors; import io.bitsquare.app.Log; import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; @@ -15,17 +13,16 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.*; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -public class PeerExchangeManager implements MessageListener, ConnectionListener { +public class PeerExchangeManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class); - private static final long RETRY_DELAY_SEC = 60; + private static final long RETRY_DELAY_SEC = 10; private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 3; private static final long REQUEST_PERIODICALLY_INTERVAL_MINUTES = 10; @@ -33,9 +30,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private final PeerManager peerManager; private final Set seedNodeAddresses; private final Map peerExchangeHandlerMap = new HashMap<>(); - private Timer connectToMorePeersTimer; - private boolean shutDownInProgress; - private ScheduledThreadPoolExecutor executor; + private Timer retryTimer, periodicTimer; + private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// @@ -50,18 +46,20 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener networkNode.addMessageListener(this); networkNode.addConnectionListener(this); + peerManager.addListener(this); } + public void shutDown() { Log.traceCall(); - shutDownInProgress = true; + stopped = true; networkNode.removeMessageListener(this); networkNode.removeConnectionListener(this); - stopConnectToMorePeersTimer(); - peerExchangeHandlerMap.values().stream().forEach(PeerExchangeHandler::cleanup); + peerManager.removeListener(this); - if (executor != null) - MoreExecutors.shutdownAndAwaitTermination(executor, 100, TimeUnit.MILLISECONDS); + stopPeriodicTimer(); + stopRetryTimer(); + closeAllPeerExchangeHandlers(); } @@ -76,39 +74,32 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Collections.shuffle(remainingNodeAddresses); requestReportedPeers(nodeAddress, remainingNodeAddresses); - if (executor == null) { - executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 2, 5); - executor.scheduleAtFixedRate(() -> UserThread.execute(this::requestAgain), - REQUEST_PERIODICALLY_INTERVAL_MINUTES, REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES); - } + startPeriodicTimer(); } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @Override public void onConnection(Connection connection) { - if (connection.getPeersNodeAddressOptional().isPresent()) - peerExchangeHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress()); + Log.traceCall(); + // clean up in case we could not clean up at disconnect + closePeerExchangeHandler(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - if (connection.getPeersNodeAddressOptional().isPresent()) - peerExchangeHandlerMap.remove(connection.getPeersNodeAddressOptional().get().getFullAddress()); - - boolean lostAllConnections = networkNode.getAllConnections().isEmpty(); - if (lostAllConnections || connectToMorePeersTimer == null) { - long delaySec = lostAllConnections ? RETRY_DELAY_AFTER_ALL_CON_LOST_SEC : RETRY_DELAY_SEC; - if (lostAllConnections && connectToMorePeersTimer != null) - connectToMorePeersTimer.stop(); - - connectToMorePeersTimer = UserThread.runAfter(() -> { + Log.traceCall(); + closePeerExchangeHandler(connection); + + if (retryTimer == null) { + retryTimer = UserThread.runAfter(() -> { log.trace("ConnectToMorePeersTimer called from onDisconnect code path"); - stopConnectToMorePeersTimer(); + stopRetryTimer(); requestWithAvailablePeers(); - }, delaySec); + }, RETRY_DELAY_SEC); } } @@ -117,6 +108,36 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } + /////////////////////////////////////////////////////////////////////////////////////////// + // PeerManager.Listener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onAllConnectionsLost() { + Log.traceCall(); + closeAllPeerExchangeHandlers(); + stopPeriodicTimer(); + stopRetryTimer(); + } + + @Override + public void onNewConnectionAfterAllConnectionsLost() { + Log.traceCall(); + closeAllPeerExchangeHandlers(); + + restart(); + } + + @Override + public void onAwakeFromStandby() { + Log.traceCall(); + closeAllPeerExchangeHandlers(); + + if (!networkNode.getAllConnections().isEmpty()) + restart(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -125,26 +146,29 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener public void onMessage(Message message, Connection connection) { if (message instanceof GetPeersRequest) { Log.traceCall(message.toString() + "\n\tconnection=" + connection); + if (!stopped) { + if (peerManager.isSeedNode(connection)) + connection.setPeerType(Connection.PeerType.SEED_NODE); - if (peerManager.isSeedNode(connection)) - connection.setPeerType(Connection.PeerType.SEED_NODE); + GetPeersRequestHandler getPeersRequestHandler = new GetPeersRequestHandler(networkNode, + peerManager, + new GetPeersRequestHandler.Listener() { + @Override + public void onComplete() { + log.trace("PeerExchangeHandshake completed.\n\tConnection={}", connection); + } - GetPeersRequestHandler getPeersRequestHandler = new GetPeersRequestHandler(networkNode, - peerManager, - new GetPeersRequestHandler.Listener() { - @Override - public void onComplete() { - log.trace("PeerExchangeHandshake of inbound connection complete.\n\tConnection={}", connection); - } - - @Override - public void onFault(String errorMessage, Connection connection) { - log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + - "connection={}", errorMessage, connection); - peerManager.handleConnectionFault(connection); - } - }); - getPeersRequestHandler.handle((GetPeersRequest) message, connection); + @Override + public void onFault(String errorMessage, Connection connection) { + log.trace("PeerExchangeHandshake failed.\n\terrorMessage={}\n\t" + + "connection={}", errorMessage, connection); + peerManager.handleConnectionFault(connection); + } + }); + getPeersRequestHandler.handle((GetPeersRequest) message, connection); + } else { + log.warn("We have stopped already. We ignore that onMessage call."); + } } } @@ -155,122 +179,130 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private void requestReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) { Log.traceCall("nodeAddress=" + nodeAddress); - if (!peerExchangeHandlerMap.containsKey(nodeAddress)) { - PeerExchangeHandler peerExchangeHandler = new PeerExchangeHandler(networkNode, - peerManager, - new PeerExchangeHandler.Listener() { - @Override - public void onComplete() { - log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress); - peerExchangeHandlerMap.remove(nodeAddress); - requestWithAvailablePeers(); - } + if (!stopped) { + if (!peerExchangeHandlerMap.containsKey(nodeAddress)) { + PeerExchangeHandler peerExchangeHandler = new PeerExchangeHandler(networkNode, + peerManager, + new PeerExchangeHandler.Listener() { + @Override + public void onComplete() { + log.trace("PeerExchangeHandshake of outbound connection complete. nodeAddress={}", nodeAddress); + peerExchangeHandlerMap.remove(nodeAddress); + requestWithAvailablePeers(); + } - @Override - public void onFault(String errorMessage, @Nullable Connection connection) { - log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + - "nodeAddress={}", errorMessage, nodeAddress); + @Override + public void onFault(String errorMessage, @Nullable Connection connection) { + log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + + "nodeAddress={}", errorMessage, nodeAddress); - peerExchangeHandlerMap.remove(nodeAddress); - peerManager.handleConnectionFault(nodeAddress, connection); - if (!shutDownInProgress) { - if (!remainingNodeAddresses.isEmpty()) { - if (!peerManager.hasSufficientConnections()) { - log.info("There are remaining nodes available for requesting peers. " + - "We will try getReportedPeers again."); - NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nextCandidate); - requestReportedPeers(nextCandidate, remainingNodeAddresses); + peerExchangeHandlerMap.remove(nodeAddress); + peerManager.handleConnectionFault(nodeAddress, connection); + if (!stopped) { + if (!remainingNodeAddresses.isEmpty()) { + if (!peerManager.hasSufficientConnections()) { + log.info("There are remaining nodes available for requesting peers. " + + "We will try getReportedPeers again."); + NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); + remainingNodeAddresses.remove(nextCandidate); + requestReportedPeers(nextCandidate, remainingNodeAddresses); + } else { + // That path will rarely be reached + log.info("We have already sufficient connections."); + } } else { - // That path will rarely be reached - log.info("We have already sufficient connections."); + log.info("There is no remaining node available for requesting peers. " + + "That is expected if no other node is online.\n\t" + + "We will try again after a pause."); + if (retryTimer == null) + retryTimer = UserThread.runAfter(() -> { + log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path"); + stopRetryTimer(); + requestWithAvailablePeers(); + }, RETRY_DELAY_SEC); } - } else { - log.info("There is no remaining node available for requesting peers. " + - "That is expected if no other node is online.\n\t" + - "We will try again after a pause."); - if (connectToMorePeersTimer == null) - connectToMorePeersTimer = UserThread.runAfter(() -> { - log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path"); - stopConnectToMorePeersTimer(); - requestWithAvailablePeers(); - }, RETRY_DELAY_SEC); } } - } - }); - peerExchangeHandlerMap.put(nodeAddress, peerExchangeHandler); - peerExchangeHandler.sendGetPeersRequest(nodeAddress); + }); + peerExchangeHandlerMap.put(nodeAddress, peerExchangeHandler); + peerExchangeHandler.sendGetPeersRequest(nodeAddress); + } else { + //TODO check when that happens + log.warn("We have started already a peerExchangeHandler. " + + "We ignore that call. nodeAddress=" + nodeAddress); + } } else { - //TODO check when that happens - log.warn("We have started already a peerExchangeHandshake. " + - "We ignore that call. " + - "nodeAddress=" + nodeAddress); + log.warn("We have stopped already. We ignore that requestReportedPeers call."); } } private void requestWithAvailablePeers() { Log.traceCall(); + if (!stopped) { + if (!peerManager.hasSufficientConnections()) { + // We create a new list of not connected candidates + // 1. shuffled reported peers + // 2. shuffled persisted peers + // 3. Add as last shuffled seedNodes (least priority) + List list = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getReportedPeers()), new ArrayList<>()); + Collections.shuffle(list); - if (!peerManager.hasSufficientConnections()) { - // We create a new list of not connected candidates - // 1. reported shuffled peers - // 2. persisted shuffled peers - // 3. Add as last shuffled seedNodes (least priority) - List list = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getReportedPeers()), new ArrayList<>()); - Collections.shuffle(list); + List filteredPersistedPeers = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getPersistedPeers()), list); + Collections.shuffle(filteredPersistedPeers); + list.addAll(filteredPersistedPeers); - List filteredPersistedPeers = getFilteredNonSeedNodeList(getNodeAddresses(peerManager.getPersistedPeers()), list); - Collections.shuffle(filteredPersistedPeers); - list.addAll(filteredPersistedPeers); + List filteredSeedNodeAddresses = getFilteredList(new ArrayList<>(seedNodeAddresses), list); + Collections.shuffle(filteredSeedNodeAddresses); + list.addAll(filteredSeedNodeAddresses); - List filteredSeedNodeAddresses = getFilteredList(new ArrayList<>(seedNodeAddresses), list); - Collections.shuffle(filteredSeedNodeAddresses); - list.addAll(filteredSeedNodeAddresses); - - log.info("Number of peers in list for connectToMorePeers: {}", list.size()); - log.trace("Filtered connectToMorePeers list: list=" + list); - if (!list.isEmpty()) { - // Dont shuffle as we want the seed nodes at the last entries - NodeAddress nextCandidate = list.get(0); - list.remove(nextCandidate); - requestReportedPeers(nextCandidate, list); + log.info("Number of peers in list for connectToMorePeers: {}", list.size()); + log.trace("Filtered connectToMorePeers list: list=" + list); + if (!list.isEmpty()) { + // Dont shuffle as we want the seed nodes at the last entries + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestReportedPeers(nextCandidate, list); + } else { + log.info("No more peers are available for requestReportedPeers. We will try again after a pause."); + if (retryTimer == null) + retryTimer = UserThread.runAfter(() -> { + log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path"); + stopRetryTimer(); + requestWithAvailablePeers(); + }, RETRY_DELAY_SEC); + } } else { - log.info("No more peers are available for requestReportedPeers. We will try again after a pause."); - if (connectToMorePeersTimer == null) - connectToMorePeersTimer = UserThread.runAfter(() -> { - log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path"); - stopConnectToMorePeersTimer(); - requestWithAvailablePeers(); - }, RETRY_DELAY_SEC); + log.info("We have already sufficient connections."); } } else { - log.info("We have already sufficient connections."); + log.warn("We have stopped already. We ignore that requestWithAvailablePeers call."); } } - /////////////////////////////////////////////////////////////////////////////////////////// - // Maintenance - /////////////////////////////////////////////////////////////////////////////////////////// - - private void requestAgain() { - checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at requestAgain"); - Set candidates = new HashSet<>(getNodeAddresses(peerManager.getReportedPeers())); - candidates.addAll(getNodeAddresses(peerManager.getPersistedPeers())); - candidates.addAll(seedNodeAddresses); - candidates.remove(networkNode.getNodeAddress()); - ArrayList list = new ArrayList<>(candidates); - Collections.shuffle(list); - NodeAddress candidate = list.remove(0); - requestReportedPeers(candidate, list); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Utils /////////////////////////////////////////////////////////////////////////////////////////// + private void startPeriodicTimer() { + stopped = false; + if (periodicTimer == null) + periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers, + REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES); + } + + private void restart() { + startPeriodicTimer(); + + if (retryTimer == null) { + retryTimer = UserThread.runAfter(() -> { + log.trace("ConnectToMorePeersTimer called from onNewConnectionAfterAllConnectionsLost"); + stopRetryTimer(); + requestWithAvailablePeers(); + }, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC); + } + } + private List getNodeAddresses(Collection collection) { return collection.stream() .map(e -> e.nodeAddress) @@ -291,10 +323,31 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener .collect(Collectors.toList()); } - private void stopConnectToMorePeersTimer() { - if (connectToMorePeersTimer != null) { - connectToMorePeersTimer.stop(); - connectToMorePeersTimer = null; + private void stopPeriodicTimer() { + stopped = true; + if (periodicTimer != null) { + periodicTimer.stop(); + periodicTimer = null; } } + + private void stopRetryTimer() { + if (retryTimer != null) { + retryTimer.stop(); + retryTimer = null; + } + } + + private void closePeerExchangeHandler(Connection connection) { + if (connection.getPeersNodeAddressOptional().isPresent()) { + NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); + peerExchangeHandlerMap.get(nodeAddress).cleanup(); + peerExchangeHandlerMap.remove(nodeAddress); + } + } + + private void closeAllPeerExchangeHandlers() { + peerExchangeHandlerMap.values().stream().forEach(PeerExchangeHandler::cleanup); + peerExchangeHandlerMap.clear(); + } } diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index b6733a7899..ddf61076d0 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p.seed; import com.google.common.annotations.VisibleForTesting; import io.bitsquare.app.Log; import io.bitsquare.app.Version; +import io.bitsquare.common.Clock; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.P2PService; @@ -137,7 +138,7 @@ public class SeedNode { log.info("Created torDir at " + torDir.getAbsolutePath()); seedNodesRepository.setNodeAddressToExclude(mySeedNodeAddress); - seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, null, null); + seedNodeP2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir, new Clock(), null, null); seedNodeP2PService.start(listener); } diff --git a/network/src/test/java/io/bitsquare/p2p/TestUtils.java b/network/src/test/java/io/bitsquare/p2p/TestUtils.java index 33ef92da00..b5b40cdd95 100644 --- a/network/src/test/java/io/bitsquare/p2p/TestUtils.java +++ b/network/src/test/java/io/bitsquare/p2p/TestUtils.java @@ -1,5 +1,6 @@ package io.bitsquare.p2p; +import io.bitsquare.common.Clock; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.crypto.EncryptionService; import io.bitsquare.p2p.seed.SeedNode; @@ -130,7 +131,7 @@ public class TestUtils { } P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost, - 2, new File("dummy"), encryptionService, keyRing); + 2, new File("dummy"), new Clock(), encryptionService, keyRing); p2PService.start(new P2PServiceListener() { @Override public void onRequestingDataCompleted() {