diff --git a/common/src/main/java/io/bitsquare/storage/Storage.java b/common/src/main/java/io/bitsquare/storage/Storage.java index 80f98f6c28..4543b3ddef 100644 --- a/common/src/main/java/io/bitsquare/storage/Storage.java +++ b/common/src/main/java/io/bitsquare/storage/Storage.java @@ -80,7 +80,7 @@ public class Storage { public T initAndGetPersisted(String fileName) { this.fileName = fileName; storageFile = new File(dir, fileName); - fileManager = new FileManager<>(dir, storageFile, 600); + fileManager = new FileManager<>(dir, storageFile, 300); return getPersisted(); } diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java index 1b48f7f6ac..3d056643b3 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java @@ -17,15 +17,14 @@ package io.bitsquare.arbitration; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.google.inject.name.Named; import io.bitsquare.app.ProgramArguments; +import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ResultHandler; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.BootstrapListener; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.P2PService; @@ -47,7 +46,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -57,10 +55,14 @@ import static org.bitcoinj.core.Utils.HEX; public class ArbitratorManager { private static final Logger log = LoggerFactory.getLogger(ArbitratorManager.class); - private final KeyRing keyRing; - private final ArbitratorService arbitratorService; - private final User user; - private final ObservableMap arbitratorsObservableMap = FXCollections.observableHashMap(); + /////////////////////////////////////////////////////////////////////////////////////////// + // Static + /////////////////////////////////////////////////////////////////////////////////////////// + + private static final long REPUBLISH_MILLIS = Arbitrator.TTL / 2; + private static final long RETRY_REPUBLISH_SEC = 5; + + private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee"; // Keys for invited arbitrators in bootstrapping phase (before registration is open to anyone and security payment is implemented) // For testing purpose here is a private key so anyone can setup an arbitrator for now. @@ -87,10 +89,24 @@ public class ArbitratorManager { "0274f772a98d23e7a0251ab30d7121897b5aebd11a2f1e45ab654aa57503173245", "036d8a1dfcb406886037d2381da006358722823e1940acc2598c844bbc0fd1026f" )); - private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee"; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Instance fields + /////////////////////////////////////////////////////////////////////////////////////////// + + private final KeyRing keyRing; + private final ArbitratorService arbitratorService; + private final User user; + private final ObservableMap arbitratorsObservableMap = FXCollections.observableHashMap(); private final boolean isDevTest; private BootstrapListener bootstrapListener; - private ScheduledThreadPoolExecutor republishArbitratorExecutor; + private Timer republishArbitratorTimer, retryRepublishArbitratorTimer; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// @Inject public ArbitratorManager(@Named(ProgramArguments.DEV_TEST) boolean isDevTest, KeyRing keyRing, ArbitratorService arbitratorService, User user) { @@ -113,19 +129,26 @@ public class ArbitratorManager { } public void shutDown() { - if (republishArbitratorExecutor != null) - MoreExecutors.shutdownAndAwaitTermination(republishArbitratorExecutor, 500, TimeUnit.MILLISECONDS); + stopRepublishArbitratorTimer(); + stopRetryRepublishArbitratorTimer(); + if (bootstrapListener != null) + arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + public void onAllServicesInitialized() { if (user.getRegisteredArbitrator() != null) { - P2PService p2PService = arbitratorService.getP2PService(); if (!p2PService.isBootstrapped()) { bootstrapListener = new BootstrapListener() { @Override public void onBootstrapComplete() { - republishArbitrator(); + ArbitratorManager.this.onBootstrapComplete(); } }; p2PService.addP2PServiceListener(bootstrapListener); @@ -133,29 +156,13 @@ public class ArbitratorManager { } else { republishArbitrator(); } - - // re-publish periodically - republishArbitratorExecutor = Utilities.getScheduledThreadPoolExecutor("republishArbitrator", 1, 5, 5); - long delay = Arbitrator.TTL / 2; - republishArbitratorExecutor.scheduleAtFixedRate(this::republishArbitrator, delay, delay, TimeUnit.MILLISECONDS); } + republishArbitratorTimer = UserThread.runPeriodically(this::republishArbitrator, REPUBLISH_MILLIS, TimeUnit.MILLISECONDS); + applyArbitrators(); } - private void republishArbitrator() { - if (bootstrapListener != null) - arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener); - - Arbitrator registeredArbitrator = user.getRegisteredArbitrator(); - if (registeredArbitrator != null) { - addArbitrator(registeredArbitrator, - this::applyArbitrators, - log::error - ); - } - } - public void applyArbitrators() { Map map = arbitratorService.getArbitrators(); log.trace("Arbitrators . size=" + map.values().size()); @@ -222,18 +229,6 @@ public class ArbitratorManager { return key.signMessage(keyToSignAsHex); } - private boolean verifySignature(PublicKey storageSignaturePubKey, byte[] registrationPubKey, String signature) { - String keyToSignAsHex = Utils.HEX.encode(storageSignaturePubKey.getEncoded()); - try { - ECKey key = ECKey.fromPublicOnly(registrationPubKey); - key.verifyMessage(keyToSignAsHex, signature); - return true; - } catch (SignatureException e) { - log.warn("verifySignature failed"); - return false; - } - } - @Nullable public ECKey getRegistrationKey(String privKeyBigIntString) { try { @@ -246,4 +241,61 @@ public class ArbitratorManager { public boolean isPublicKeyInList(String pubKeyAsHex) { return isDevTest && pubKeyAsHex.equals(publicKeyForTesting) || publicKeys.contains(pubKeyAsHex); } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// + + private void onBootstrapComplete() { + if (bootstrapListener != null) { + arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener); + bootstrapListener = null; + } + + republishArbitrator(); + } + + private void republishArbitrator() { + Arbitrator registeredArbitrator = user.getRegisteredArbitrator(); + if (registeredArbitrator != null) { + addArbitrator(registeredArbitrator, + this::applyArbitrators, + errorMessage -> { + if (retryRepublishArbitratorTimer == null) + retryRepublishArbitratorTimer = UserThread.runPeriodically(() -> { + stopRetryRepublishArbitratorTimer(); + republishArbitrator(); + }, RETRY_REPUBLISH_SEC); + } + ); + } + } + + private boolean verifySignature(PublicKey storageSignaturePubKey, byte[] registrationPubKey, String signature) { + String keyToSignAsHex = Utils.HEX.encode(storageSignaturePubKey.getEncoded()); + try { + ECKey key = ECKey.fromPublicOnly(registrationPubKey); + key.verifyMessage(keyToSignAsHex, signature); + return true; + } catch (SignatureException e) { + log.warn("verifySignature failed"); + return false; + } + } + + + private void stopRetryRepublishArbitratorTimer() { + if (retryRepublishArbitratorTimer != null) { + retryRepublishArbitratorTimer.stop(); + retryRepublishArbitratorTimer = null; + } + } + + private void stopRepublishArbitratorTimer() { + if (republishArbitratorTimer != null) { + republishArbitratorTimer.stop(); + republishArbitratorTimer = null; + } + } } diff --git a/core/src/main/java/io/bitsquare/trade/offer/Offer.java b/core/src/main/java/io/bitsquare/trade/offer/Offer.java index 227a4c4c9c..dc7a65a043 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/Offer.java +++ b/core/src/main/java/io/bitsquare/trade/offer/Offer.java @@ -54,8 +54,8 @@ public final class Offer implements StoragePayload, RequiresOwnerIsOnlinePayload @JsonExclude private static final Logger log = LoggerFactory.getLogger(Offer.class); - public static final long TTL = TimeUnit.SECONDS.toMillis(60); - // public static final long TTL = TimeUnit.SECONDS.toMillis(10); //TODO + // public static final long TTL = TimeUnit.SECONDS.toMillis(60); + public static final long TTL = TimeUnit.SECONDS.toMillis(10); //TODO public final static String TAC_OFFERER = "When placing that offer I accept that anyone who fulfills my conditions can " + "take that offer."; diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 1c0ea884f4..186c648ebd 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -60,7 +60,10 @@ import static io.bitsquare.util.Validator.nonEmptyStringOf; public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener { private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class); - private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 5; + private static final long RETRY_REPUBLISH_DELAY_SEC = 5; + private static final long REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC = 10; + private static final long REPUBLISH_INTERVAL_MILLIS = 10 * Offer.TTL; + private static final long REFRESH_INTERVAL_MILLIS = (long) (Offer.TTL * 0.5); private final KeyRing keyRing; private final User user; @@ -73,7 +76,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private final TradableList openOffers; private final Storage> openOffersStorage; private boolean stopped; - private Timer periodicRepublishOffersTimer, periodicRefreshOffersTimer, republishOffersTimer; + private Timer periodicRepublishOffersTimer, periodicRefreshOffersTimer, retryRepublishOffersTimer; private BootstrapListener bootstrapListener; @@ -131,6 +134,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopPeriodicRefreshOffersTimer(); stopPeriodicRepublishOffersTimer(); + stopRetryRepublishOffersTimer(); log.info("remove all open offers at shutDown"); // we remove own offers from offerbook when we go offline @@ -167,10 +171,21 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe // Republish means we send the complete offer object republishOffers(); - startRepublishOffersThread(); + startPeriodicRepublishOffersTimer(); // Refresh is started once we get a success from republish + // We republish after a bit as it might be that our connected node still has the offer in the data map + // but other peers have it already removed because of expired TTL. + // Those other not directly connected peers would not get the broadcast of the new offer, as the first + // connected peer (seed node) does nto broadcast if it has the data in the map. + // To update quickly to the whole network we repeat the republishOffers call after a few seconds when we + // are better connected to the network. There is no guarantee that all peers will receive it but we have + // also our periodic timer, so after that longer interval the offer should be available to all peers. + if (retryRepublishOffersTimer == null) + retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, + REPUBLISH_AGAIN_AT_STARTUP_DELAY_SEC); + p2PService.getPeerManager().addListener(this); } @@ -184,90 +199,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopped = true; stopPeriodicRefreshOffersTimer(); stopPeriodicRepublishOffersTimer(); + stopRetryRepublishOffersTimer(); + + restart(); } @Override public void onNewConnectionAfterAllConnectionsLost() { + stopped = false; restart(); } @Override public void onAwakeFromStandby() { + stopped = false; if (!p2PService.getNetworkNode().getAllConnections().isEmpty()) restart(); } - /////////////////////////////////////////////////////////////////////////////////////////// - // RepublishOffers, refreshOffers - /////////////////////////////////////////////////////////////////////////////////////////// - - private void startRepublishOffersThread() { - stopped = false; - if (periodicRepublishOffersTimer == null) - periodicRepublishOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::republishOffers, - Offer.TTL * 10, - TimeUnit.MILLISECONDS); - } - - private void republishOffers() { - Log.traceCall("Number of offer for republish: " + openOffers.size()); - if (!stopped) { - stopPeriodicRefreshOffersTimer(); - - for (OpenOffer openOffer : openOffers) { - offerBookService.republishOffers(openOffer.getOffer(), - () -> { - log.debug("Successful added offer to P2P network"); - // Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.) - startRefreshOffersThread(); - }, - errorMessage -> { - //TODO handle with retry - log.error("Add offer to P2P network failed. " + errorMessage); - stopRepublishOffersTimer(); - republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, - RETRY_DELAY_AFTER_ALL_CON_LOST_SEC); - }); - openOffer.setStorage(openOffersStorage); - } - } else { - log.warn("We have stopped already. We ignore that republishOffers call."); - } - } - - private void startRefreshOffersThread() { - stopped = false; - // refresh sufficiently before offer would expire - if (periodicRefreshOffersTimer == null) - periodicRefreshOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::refreshOffers, - (long) (Offer.TTL * 0.5), - TimeUnit.MILLISECONDS); - } - - private void refreshOffers() { - if (!stopped) { - Log.traceCall("Number of offer for refresh: " + openOffers.size()); - for (OpenOffer openOffer : openOffers) { - offerBookService.refreshOffer(openOffer.getOffer(), - () -> log.debug("Successful refreshed TTL for offer"), - errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage)); - } - } else { - log.warn("We have stopped already. We ignore that refreshOffers call."); - } - } - - private void restart() { - startRepublishOffersThread(); - startRefreshOffersThread(); - if (republishOffersTimer == null) { - stopped = false; - republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC); - } - } - - /////////////////////////////////////////////////////////////////////////////////////////// // API /////////////////////////////////////////////////////////////////////////////////////////// @@ -282,6 +232,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe openOffers.add(openOffer); openOffersStorage.queueUpForSave(); resultHandler.handleResult(transaction); + if (!stopped) { + startPeriodicRepublishOffersTimer(); + startPeriodicRefreshOffersTimer(); + } else { + log.warn("We have stopped already. We ignore that placeOfferProtocol.placeOffer.onResult call."); + } } ); placeOfferProtocol.placeOffer(); @@ -395,6 +351,97 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } + /////////////////////////////////////////////////////////////////////////////////////////// + // RepublishOffers, refreshOffers + /////////////////////////////////////////////////////////////////////////////////////////// + + private void republishOffers() { + Log.traceCall("Number of offer for republish: " + openOffers.size()); + if (!stopped) { + stopPeriodicRefreshOffersTimer(); + + openOffers.stream().forEach(openOffer -> { + offerBookService.republishOffers(openOffer.getOffer(), + () -> { + if (!stopped) { + log.debug("Successful added offer to P2P network"); + // Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.) + if (periodicRefreshOffersTimer == null) + startPeriodicRefreshOffersTimer(); + } else { + log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call."); + } + }, + errorMessage -> { + if (!stopped) { + log.error("Add offer to P2P network failed. " + errorMessage); + stopRetryRepublishOffersTimer(); + retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, + RETRY_REPUBLISH_DELAY_SEC); + } else { + log.warn("We have stopped already. We ignore that offerBookService.republishOffers.onFault call."); + } + }); + openOffer.setStorage(openOffersStorage); + }); + } else { + log.warn("We have stopped already. We ignore that republishOffers call."); + } + } + + private void startPeriodicRepublishOffersTimer() { + Log.traceCall(); + stopped = false; + if (periodicRepublishOffersTimer == null) + periodicRepublishOffersTimer = UserThread.runPeriodically(() -> { + if (!stopped) { + republishOffers(); + } else { + log.warn("We have stopped already. We ignore that periodicRepublishOffersTimer.run call."); + } + }, + REPUBLISH_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + else + log.warn("periodicRepublishOffersTimer already stated"); + } + + private void startPeriodicRefreshOffersTimer() { + Log.traceCall(); + stopped = false; + // refresh sufficiently before offer would expire + if (periodicRefreshOffersTimer == null) + periodicRefreshOffersTimer = UserThread.runPeriodically(() -> { + if (!stopped) { + Log.traceCall("Number of offer for refresh: " + openOffers.size()); + openOffers.stream().forEach(openOffer -> { + offerBookService.refreshOffer(openOffer.getOffer(), + () -> log.debug("Successful refreshed TTL for offer"), + errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage)); + }); + } else { + log.warn("We have stopped already. We ignore that periodicRefreshOffersTimer.run call."); + } + }, + REFRESH_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + else + log.warn("periodicRefreshOffersTimer already stated"); + } + + private void restart() { + Log.traceCall(); + if (retryRepublishOffersTimer == null) + retryRepublishOffersTimer = UserThread.runAfter(() -> { + stopped = false; + stopRetryRepublishOffersTimer(); + republishOffers(); + }, RETRY_REPUBLISH_DELAY_SEC); + + startPeriodicRepublishOffersTimer(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// @@ -413,10 +460,10 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } } - private void stopRepublishOffersTimer() { - if (republishOffersTimer != null) { - republishOffersTimer.stop(); - republishOffersTimer = null; + private void stopRetryRepublishOffersTimer() { + if (retryRepublishOffersTimer != null) { + retryRepublishOffersTimer.stop(); + retryRepublishOffersTimer = null; } } } diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java index b9e9cbaf21..5400db274c 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferView.java @@ -696,6 +696,8 @@ public class CreateOfferView extends ActivatableViewAndModel onPlaceOffer()); createOfferButton.setMinHeight(40); + createOfferButton.setPadding(new Insets(0, 20, 0, 20)); + placeOfferSpinner = placeOfferTuple.second; placeOfferSpinner.setPrefSize(18, 18); placeOfferSpinnerInfoLabel = placeOfferTuple.third; diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java index bd073db10d..0eb3ee800b 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java @@ -579,6 +579,8 @@ public class TakeOfferView extends ActivatableViewAndModel onTakeOffer()); takeOfferButton.setMinHeight(40); + takeOfferButton.setPadding(new Insets(0, 20, 0, 20)); + takeOfferSpinner = takeOfferTuple.second; takeOfferSpinner.setPrefSize(18, 18); takeOfferSpinnerInfoLabel = takeOfferTuple.third; diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 090504a5c9..402f4041f3 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -124,14 +124,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis networkNode.addConnectionListener(this); networkNode.addMessageListener(this); - broadcaster = new Broadcaster(networkNode); - - p2PDataStorage = new P2PDataStorage(broadcaster, networkNode, storageDir); - p2PDataStorage.addHashMapChangedListener(this); - Set seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId); peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir, clock); + broadcaster = new Broadcaster(networkNode, peerManager); + + p2PDataStorage = new P2PDataStorage(broadcaster, networkNode, storageDir); + p2PDataStorage.addHashMapChangedListener(this); + requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this); peerExchangeManager = new PeerExchangeManager(networkNode, peerManager, seedNodeAddresses); @@ -175,6 +175,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis if (peerManager != null) peerManager.shutDown(); + if (broadcaster != null) + broadcaster.shutDown(); + if (requestDataManager != null) requestDataManager.shutDown(); @@ -224,7 +227,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Log.traceCall(); requestDataManager.requestPreliminaryData(); - keepAliveManager.start(); + keepAliveManager.restart(); p2pServiceListeners.stream().forEach(SetupListener::onTorNodeReady); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index a57e17de0c..ae15c8f884 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -60,9 +60,9 @@ public class Connection implements MessageListener { private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec private static final int MSG_THROTTLE_PER_10_SEC = 50; // With MAX_MSG_SIZE of 100kb results in bandwidth of 5 mbit/sec for 10 sec - private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + //private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); //TODO - // private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(30); + private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); public static int getMaxMsgSize() { return MAX_MSG_SIZE; @@ -358,6 +358,7 @@ public class Connection implements MessageListener { log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "ShutDown connection:" + "\npeersNodeAddress=" + peersNodeAddress + + "\ncloseConnectionReason=" + closeConnectionReason + "\nuid=" + uid + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java index f4bcfc0b03..5d74c64f34 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java @@ -5,7 +5,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.CloseConnectionReason; import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.storage.messages.DataBroadcastMessage; import javafx.beans.property.IntegerProperty; @@ -19,7 +21,7 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -public class Broadcaster { +public class Broadcaster implements ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(Broadcaster.class); @@ -28,15 +30,35 @@ public class Broadcaster { } private final NetworkNode networkNode; + private PeerManager peerManager; private final Set listeners = new CopyOnWriteArraySet<>(); - + private boolean stopped = false; private final IntegerProperty numOfBroadcasts = new SimpleIntegerProperty(0); - public Broadcaster(NetworkNode networkNode) { + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// + + public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { this.networkNode = networkNode; + this.peerManager = peerManager; + networkNode.removeConnectionListener(this); + peerManager.removeListener(this); } + public void shutDown() { + stopped = true; + networkNode.removeConnectionListener(this); + peerManager.removeListener(this); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + public void broadcast(DataBroadcastMessage message, @Nullable NodeAddress sender) { Log.traceCall("Sender=" + sender + "\n\t" + "Message=" + StringUtils.abbreviate(message.toString(), 100)); @@ -48,21 +70,28 @@ public class Broadcaster { .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .forEach(connection -> { NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get(); - log.trace("Broadcast message to " + - nodeAddress + "."); + log.trace("Broadcast message to " + nodeAddress + "."); SettableFuture future = networkNode.sendMessage(connection, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Broadcast to " + nodeAddress + " succeeded."); - numOfBroadcasts.set(numOfBroadcasts.get() + 1); - listeners.stream().forEach(listener -> listener.onBroadcasted(message)); + if (!stopped) { + //log.trace("Broadcast to " + nodeAddress + " succeeded."); + numOfBroadcasts.set(numOfBroadcasts.get() + 1); + listeners.stream().forEach(listener -> listener.onBroadcasted(message)); + } else { + log.warn("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); + } } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("Broadcast to " + nodeAddress + " failed.\n\t" + - "ErrorMessage=" + throwable.getMessage()); + if (!stopped) { + log.info("Broadcast to " + nodeAddress + " failed.\n\t" + + "ErrorMessage=" + throwable.getMessage()); + } else { + log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call."); + } } }); }); @@ -85,4 +114,43 @@ public class Broadcaster { public void removeListener(Listener listener) { listeners.remove(listener); } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + stopped = false; + } + + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + } + + @Override + public void onError(Throwable throwable) { + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // PeerManager.Listener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onAllConnectionsLost() { + stopped = true; + } + + @Override + public void onNewConnectionAfterAllConnectionsLost() { + stopped = false; + } + + @Override + public void onAwakeFromStandby() { + if (!networkNode.getAllConnections().isEmpty()) + stopped = false; + } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 0f186738e1..d55a0236a1 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -4,10 +4,8 @@ import io.bitsquare.app.Log; import io.bitsquare.common.Clock; import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; -import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.peers.getdata.messages.GetUpdatedDataRequest; import io.bitsquare.p2p.peers.peerexchange.ReportedPeer; import io.bitsquare.storage.Storage; import javafx.beans.value.ChangeListener; @@ -20,15 +18,15 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; - -public class PeerManager implements ConnectionListener, MessageListener { +public class PeerManager implements ConnectionListener { private static final Logger log = LoggerFactory.getLogger(PeerManager.class); /////////////////////////////////////////////////////////////////////////////////////////// // Static /////////////////////////////////////////////////////////////////////////////////////////// + private static final long CHECK_MAX_CONN_DELAY_SEC = 3; + private static int MAX_CONNECTIONS; private static int MIN_CONNECTIONS; private static int MAX_CONNECTIONS_PEER; @@ -85,6 +83,8 @@ public class PeerManager implements ConnectionListener, MessageListener { private final ChangeListener connectionNodeAddressListener; private final Clock.Listener listener; private final List listeners = new LinkedList<>(); + private boolean stopped; + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -107,10 +107,14 @@ public class PeerManager implements ConnectionListener, MessageListener { printConnectedPeers(); if (checkMaxConnectionsTimer == null && newValue != null) checkMaxConnectionsTimer = UserThread.runAfter(() -> { - removeTooOldReportedPeers(); - removeTooOldPersistedPeers(); - checkMaxConnections(MAX_CONNECTIONS); - }, 3); + if (!stopped) { + removeTooOldReportedPeers(); + removeTooOldPersistedPeers(); + checkMaxConnections(MAX_CONNECTIONS); + } else { + log.warn("We have stopped already. We ignore that checkMaxConnectionsTimer.run call."); + } + }, CHECK_MAX_CONN_DELAY_SEC); }; // we check if app was idle for more then 5 sec. @@ -126,7 +130,8 @@ public class PeerManager implements ConnectionListener, MessageListener { @Override public void onMissedSecondTick(long missed) { if (missed > Clock.IDLE_TOLERANCE) { - log.error("We have been idle for {} sec", missed / 1000); + log.warn("We have been in standby mode for {} sec", missed / 1000); + stopped = false; listeners.stream().forEach(Listener::onAwakeFromStandby); } } @@ -142,6 +147,11 @@ public class PeerManager implements ConnectionListener, MessageListener { stopCheckMaxConnectionsTimer(); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + public int getMaxConnections() { return MAX_CONNECTIONS_ABSOLUTE; } @@ -154,6 +164,7 @@ public class PeerManager implements ConnectionListener, MessageListener { listeners.remove(listener); } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -161,18 +172,13 @@ public class PeerManager implements ConnectionListener, MessageListener { @Override public void onConnection(Connection connection) { connection.getNodeAddressProperty().addListener(connectionNodeAddressListener); - Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); - // OutboundConnection always know their peers address - // A seed node get only InboundConnection from other seed nodes with getData or peer exchange, - // never direct messages, so we need to check for PeerType.SEED_NODE at onMessage - if (connection instanceof OutboundConnection && - peersNodeAddressOptional.isPresent() && - seedNodeAddresses.contains(peersNodeAddressOptional.get())) { + + if (isSeedNode(connection)) connection.setPeerType(Connection.PeerType.SEED_NODE); - } if (lostAllConnections) { lostAllConnections = false; + stopped = false; listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost); } } @@ -183,8 +189,10 @@ public class PeerManager implements ConnectionListener, MessageListener { handleConnectionFault(connection); lostAllConnections = networkNode.getAllConnections().isEmpty(); - if (lostAllConnections) + if (lostAllConnections) { + stopped = true; listeners.stream().forEach(Listener::onAllConnectionsLost); + } } @Override @@ -193,25 +201,7 @@ public class PeerManager implements ConnectionListener, MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// - // MessageListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - //TODO move to RequestDataManager - @Override - public void onMessage(Message message, Connection connection) { - // In case a seed node connects to another seed node we get his address at the DataRequest triggered from - // RequestDataManager.updateDataFromConnectedSeedNode - if (message instanceof GetUpdatedDataRequest) { - Log.traceCall(message.toString() + "\n\tconnection=" + connection); - - if (isSeedNode(connection)) - connection.setPeerType(Connection.PeerType.SEED_NODE); - } - } - - - /////////////////////////////////////////////////////////////////////////////////////////// - // Check seed node connections + // Check max connections /////////////////////////////////////////////////////////////////////////////////////////// private boolean checkMaxConnections(int limit) { @@ -222,7 +212,6 @@ public class PeerManager implements ConnectionListener, MessageListener { int size = allConnections.size(); log.info("We have {} connections open. Our limit is {}", size, limit); if (size > limit) { - // Only InboundConnection, and PEER type connections log.info("We have too many connections open.\n\t" + "Lets try first to remove the inbound connections of type PEER."); List candidates = allConnections.stream() @@ -235,7 +224,6 @@ public class PeerManager implements ConnectionListener, MessageListener { "MAX_CONNECTIONS_PEER limit of {}", MAX_CONNECTIONS_PEER); if (size > MAX_CONNECTIONS_PEER) { log.info("Lets try to remove ANY connection of type PEER."); - // Only PEER type connections candidates = allConnections.stream() .filter(e -> e.getPeerType() == Connection.PeerType.PEER) .collect(Collectors.toList()); @@ -245,7 +233,6 @@ public class PeerManager implements ConnectionListener, MessageListener { "MAX_CONNECTIONS_NON_DIRECT limit of {}", MAX_CONNECTIONS_NON_DIRECT); if (size > MAX_CONNECTIONS_NON_DIRECT) { log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER."); - // All connections except DIRECT_MSG_PEER candidates = allConnections.stream() .filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) .collect(Collectors.toList()); @@ -255,9 +242,7 @@ public class PeerManager implements ConnectionListener, MessageListener { "MAX_CONNECTIONS_ABSOLUTE limit of {}", MAX_CONNECTIONS_ABSOLUTE); if (size > MAX_CONNECTIONS_ABSOLUTE) { log.info("Lets try to remove any connection."); - // All connections - candidates = allConnections.stream() - .collect(Collectors.toList()); + candidates = allConnections.stream().collect(Collectors.toList()); } } } @@ -286,9 +271,9 @@ public class PeerManager implements ConnectionListener, MessageListener { private void removeSuperfluousSeedNodes() { Log.traceCall(); - Set allConnections = networkNode.getAllConnections(); - if (allConnections.size() > MAX_CONNECTIONS_PEER) { - List candidates = allConnections.stream() + Set connections = networkNode.getConfirmedConnections(); + if (hasSufficientConnections()) { + List candidates = connections.stream() .filter(this::isSeedNode) .collect(Collectors.toList()); @@ -318,7 +303,7 @@ public class PeerManager implements ConnectionListener, MessageListener { .filter(e -> e.nodeAddress.equals(nodeAddress)).findAny(); if (reportedPeerOptional.isPresent()) { ReportedPeer reportedPeer = reportedPeerOptional.get(); - reportedPeers.remove(reportedPeer); + removeReportedPeer(reportedPeer); return reportedPeer; } else { return null; @@ -338,7 +323,7 @@ public class PeerManager implements ConnectionListener, MessageListener { } public void addToReportedPeers(HashSet reportedPeersToAdd, Connection connection) { - printReportedPeers(reportedPeersToAdd); + printNewReportedPeers(reportedPeersToAdd); // We check if the reported msg is not violating our rules if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + PeerManager.MAX_CONNECTIONS_ABSOLUTE + 10)) { @@ -359,6 +344,25 @@ public class PeerManager implements ConnectionListener, MessageListener { } } + private void purgeReportedPeersIfExceeds() { + Log.traceCall(); + int size = reportedPeers.size(); + int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE; + if (size > limit) { + log.trace("We have already {} reported peers which exceeds our limit of {}." + + "We remove random peers from the reported peers list.", size, limit); + int diff = size - limit; + List list = new ArrayList<>(reportedPeers); + // we dont use sorting by lastActivityDate to keep it more random + for (int i = 0; i < diff; i++) { + ReportedPeer toRemove = list.remove(new Random().nextInt(list.size())); + removeReportedPeer(toRemove); + } + } else { + log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS); + } + } + private void printReportedPeers() { if (!reportedPeers.isEmpty()) { if (printReportedPeersDetails) { @@ -372,25 +376,26 @@ public class PeerManager implements ConnectionListener, MessageListener { } } - private void printReportedPeers(HashSet reportedPeers) { + private void printNewReportedPeers(HashSet reportedPeers) { if (printReportedPeersDetails) { - StringBuilder result = new StringBuilder("We received now reportedPeers:"); + StringBuilder result = new StringBuilder("We received new reportedPeers:"); reportedPeers.stream().forEach(e -> result.append("\n\t").append(e)); log.info(result.toString()); } log.info("Number of new arrived reported peers: {}", reportedPeers.size()); } + /////////////////////////////////////////////////////////////////////////////////////////// // Persisted peers /////////////////////////////////////////////////////////////////////////////////////////// - private boolean removePersistedPeer(ReportedPeer reportedPeer) { - if (persistedPeers.contains(reportedPeer)) { - persistedPeers.remove(reportedPeer); + private boolean removePersistedPeer(ReportedPeer persistedPeer) { + if (persistedPeers.contains(persistedPeer)) { + persistedPeers.remove(persistedPeer); if (dbStorage != null) - dbStorage.queueUpForSave(persistedPeers, 5000); + dbStorage.queueUpForSave(persistedPeers, 2000); return true; } else { @@ -401,12 +406,7 @@ public class PeerManager implements ConnectionListener, MessageListener { private boolean removePersistedPeer(NodeAddress nodeAddress) { Optional persistedPeerOptional = persistedPeers.stream() .filter(e -> e.nodeAddress.equals(nodeAddress)).findAny(); - persistedPeerOptional.ifPresent(persistedPeer -> { - persistedPeers.remove(persistedPeer); - if (dbStorage != null) - dbStorage.queueUpForSave(persistedPeers, 5000); - }); - return persistedPeerOptional.isPresent(); + return persistedPeerOptional.isPresent() && removePersistedPeer(persistedPeerOptional.get()); } private void removeTooOldPersistedPeers() { @@ -417,6 +417,24 @@ public class PeerManager implements ConnectionListener, MessageListener { persistedPeersToRemove.forEach(this::removePersistedPeer); } + private void purgePersistedPeersIfExceeds() { + Log.traceCall(); + int size = persistedPeers.size(); + int limit = MAX_PERSISTED_PEERS; + if (size > limit) { + log.trace("We have already {} persisted peers which exceeds our limit of {}." + + "We remove random peers from the persisted peers list.", size, limit); + int diff = size - limit; + List list = new ArrayList<>(persistedPeers); + // we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random + for (int i = 0; i < diff; i++) { + ReportedPeer toRemove = list.remove(new Random().nextInt(list.size())); + removePersistedPeer(toRemove); + } + } else { + log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_PERSISTED_PEERS); + } + } public Set getPersistedPeers() { return persistedPeers; @@ -431,32 +449,6 @@ public class PeerManager implements ConnectionListener, MessageListener { return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; } - public void handleConnectionFault(Connection connection) { - connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection)); - } - - public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) { - Log.traceCall("nodeAddress=" + nodeAddress); - ReportedPeer reportedPeer = removeReportedPeer(nodeAddress); - if (connection != null && connection.getRuleViolation() != null) { - removePersistedPeer(nodeAddress); - } else { - if (reportedPeer != null) { - removePersistedPeer(nodeAddress); - persistedPeers.add(reportedPeer); - dbStorage.queueUpForSave(persistedPeers, 5000); - - removeTooOldPersistedPeers(); - } - } - } - - /* public Set getConnectedAndReportedPeers() { - Set result = new HashSet<>(reportedPeers); - result.addAll(getConnectedPeers()); - return result; - }*/ - public boolean isSeedNode(ReportedPeer reportedPeer) { return seedNodeAddresses.contains(reportedPeer.nodeAddress); } @@ -486,6 +478,23 @@ public class PeerManager implements ConnectionListener, MessageListener { return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress); } + public void handleConnectionFault(Connection connection) { + connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection)); + } + + public void handleConnectionFault(NodeAddress nodeAddress) { + handleConnectionFault(nodeAddress, null); + } + + public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) { + Log.traceCall("nodeAddress=" + nodeAddress); + ReportedPeer reportedPeer = removeReportedPeer(nodeAddress); + if (connection != null && connection.getRuleViolation() != null) + removePersistedPeer(nodeAddress); + else + removeTooOldPersistedPeers(); + } + public void shutDownConnection(Connection connection, CloseConnectionReason closeConnectionReason) { if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) connection.shutDown(closeConnectionReason); @@ -500,54 +509,17 @@ public class PeerManager implements ConnectionListener, MessageListener { .ifPresent(connection -> connection.shutDown(closeConnectionReason)); } + public HashSet getConnectedNonSeedNodeReportedPeers(NodeAddress excludedNodeAddress) { + return new HashSet<>(getConnectedNonSeedNodeReportedPeers().stream() + .filter(e -> !e.nodeAddress.equals(excludedNodeAddress)) + .collect(Collectors.toSet())); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void purgeReportedPeersIfExceeds() { - Log.traceCall(); - int size = getReportedPeers().size(); - int limit = MAX_REPORTED_PEERS - MAX_CONNECTIONS_ABSOLUTE; - if (size > limit) { - log.trace("We have already {} reported peers which exceeds our limit of {}." + - "We remove random peers from the reported peers list.", size, limit); - int diff = size - limit; - List list = new ArrayList<>(getReportedPeers()); - // we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random - for (int i = 0; i < diff; i++) { - ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list); - removeReportedPeer(toRemove); - } - } else { - log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS); - } - } - - private void purgePersistedPeersIfExceeds() { - Log.traceCall(); - int size = getPersistedPeers().size(); - int limit = MAX_PERSISTED_PEERS - MAX_CONNECTIONS_ABSOLUTE; - if (size > limit) { - log.trace("We have already {} persisted peers which exceeds our limit of {}." + - "We remove random peers from the persisted peers list.", size, limit); - int diff = size - limit; - List list = new ArrayList<>(getReportedPeers()); - // we dont use sorting by lastActivityDate to avoid attack vectors and keep it more random - for (int i = 0; i < diff; i++) { - ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list); - removePersistedPeer(toRemove); - } - } else { - log.trace("No need to purge persisted peers.\n\tWe don't have more then {} persisted peers yet.", MAX_PERSISTED_PEERS); - } - } - - private ReportedPeer getAndRemoveRandomReportedPeer(List list) { - checkArgument(!list.isEmpty(), "List must not be empty"); - return list.remove(new Random().nextInt(list.size())); - } - - private Set getConnectedPeers() { + private Set getConnectedReportedPeers() { // networkNode.getConfirmedConnections includes: // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) return networkNode.getConfirmedConnections().stream() @@ -555,18 +527,12 @@ public class PeerManager implements ConnectionListener, MessageListener { .collect(Collectors.toSet()); } - private HashSet getConnectedPeersNonSeedNodes() { - return new HashSet<>(getConnectedPeers().stream() + private HashSet getConnectedNonSeedNodeReportedPeers() { + return new HashSet<>(getConnectedReportedPeers().stream() .filter(e -> !isSeedNode(e)) .collect(Collectors.toSet())); } - public HashSet getConnectedPeersNonSeedNodes(NodeAddress excludedNodeAddress) { - return new HashSet<>(getConnectedPeersNonSeedNodes().stream() - .filter(e -> !e.nodeAddress.equals(excludedNodeAddress)) - .collect(Collectors.toSet())); - } - private void stopCheckMaxConnectionsTimer() { if (checkMaxConnectionsTimer != null) { checkMaxConnectionsTimer.stop(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index 2a8f3b3416..d9699aa09e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -50,7 +50,7 @@ public class RequestDataHandler implements MessageListener { private final PeerManager peerManager; private final Listener listener; private Timer timeoutTimer; - private final long nonce = new Random().nextLong(); + private final int nonce = new Random().nextInt(); private boolean stopped; /////////////////////////////////////////////////////////////////////////////////////////// @@ -67,11 +67,8 @@ public class RequestDataHandler implements MessageListener { networkNode.addMessageListener(this); } - public void cleanup() { - Log.traceCall(); - stopped = true; - networkNode.removeMessageListener(this); - stopTimeoutTimer(); + public void cancel() { + cleanup(); } @@ -99,21 +96,29 @@ public class RequestDataHandler implements MessageListener { @Override public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending getDataRequest to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\t" + - "getDataRequest=" + getDataRequest + "." + - "\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); + if (!stopped) { + String errorMessage = "Sending getDataRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\t" + + "getDataRequest=" + getDataRequest + "." + + "\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); + } else { + log.warn("We have stopped already. We ignore that requestData.onFailure call."); + } } }); checkArgument(timeoutTimer == null, "requestData must not be called twice."); timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest + - " on nodeAddress:" + nodeAddress; - log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this); - handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); + if (!stopped) { + String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest + + " on nodeAddress:" + nodeAddress; + log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this); + handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); + } else { + log.warn("We have stopped already. We ignore that timeoutTimer.run call."); + } }, 10); } else { @@ -160,16 +165,25 @@ public class RequestDataHandler implements MessageListener { // Private /////////////////////////////////////////////////////////////////////////////////////////// + + private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { + cleanup(); + peerManager.shutDownConnection(nodeAddress, closeConnectionReason); + peerManager.handleConnectionFault(nodeAddress); + listener.onFault(errorMessage, null); + } + + private void cleanup() { + Log.traceCall(); + stopped = true; + networkNode.removeMessageListener(this); + stopTimeoutTimer(); + } + private void stopTimeoutTimer() { if (timeoutTimer != null) { timeoutTimer.stop(); timeoutTimer = null; } } - - private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) { - cleanup(); - peerManager.shutDownConnection(nodeAddress, closeConnectionReason); - listener.onFault(errorMessage, null); - } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java index bb91a49325..aa76915b24 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataManager.java @@ -82,7 +82,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, stopRetryTimer(); networkNode.removeMessageListener(this); peerManager.removeListener(this); - handlerMap.values().stream().forEach(RequestDataHandler::cleanup); + closeAllHandlers(); } @@ -122,14 +122,12 @@ public class RequestDataManager implements MessageListener, ConnectionListener, @Override public void onConnection(Connection connection) { Log.traceCall(); - // clean up in case we could not clean up at disconnect - closeRequestDataHandler(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { Log.traceCall(); - closeRequestDataHandler(connection); + closeHandler(connection); } @Override @@ -144,28 +142,27 @@ public class RequestDataManager implements MessageListener, ConnectionListener, @Override public void onAllConnectionsLost() { Log.traceCall(); - closeAllRequestDataHandlers(); + closeAllHandlers(); stopRetryTimer(); stopped = true; + restart(); } @Override public void onNewConnectionAfterAllConnectionsLost() { Log.traceCall(); - closeAllRequestDataHandlers(); + closeAllHandlers(); stopped = false; - - retryAfterDelay(); + restart(); } @Override public void onAwakeFromStandby() { Log.traceCall(); - closeAllRequestDataHandlers(); + closeAllHandlers(); stopped = false; - if (!networkNode.getAllConnections().isEmpty()) - retryAfterDelay(); + restart(); } @@ -185,22 +182,25 @@ public class RequestDataManager implements MessageListener, ConnectionListener, new GetDataRequestHandler.Listener() { @Override public void onComplete() { - log.trace("requestDataHandshake completed.\n\tConnection={}", - connection); + log.trace("requestDataHandshake completed.\n\tConnection={}", connection); } @Override public void onFault(String errorMessage, @Nullable Connection connection) { - log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + - "ErrorMessage={}", connection, errorMessage); - peerManager.handleConnectionFault(connection); + if (!stopped) { + log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + + "ErrorMessage={}", connection, errorMessage); + peerManager.handleConnectionFault(connection); + } else { + log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); + } } }); getDataRequestHandler.handle((GetDataRequest) message, connection); } else { log.warn("We have stopped already. We ignore that onMessage call."); } - } + } } /////////////////////////////////////////////////////////////////////////////////////////// @@ -239,37 +239,32 @@ public class RequestDataManager implements MessageListener, ConnectionListener, @Override public void onFault(String errorMessage, @Nullable Connection connection) { - log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" + + log.trace("requestDataHandshake with outbound connection failed.\n\tnodeAddress={}\n\t" + "ErrorMessage={}", nodeAddress, errorMessage); handlerMap.remove(nodeAddress); - peerManager.handleConnectionFault(nodeAddress, connection); - if (!stopped) { - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are remaining nodes available for requesting data. " + - "We will try requestDataFromPeers again."); - NodeAddress nextCandidate = remainingNodeAddresses.get(0); - remainingNodeAddresses.remove(nextCandidate); - requestData(nextCandidate, remainingNodeAddresses); - } else { - log.info("There is no remaining node available for requesting data. " + - "That is expected if no other node is online.\n\t" + - "We will try to use reported peers (if no available we use persisted peers) " + - "and try again to request data from our seed nodes after a random pause."); - - // Notify listeners - if (!nodeAddressOfPreliminaryDataRequest.isPresent()) { - if (peerManager.isSeedNode(nodeAddress)) - listener.onNoSeedNodeAvailable(); - else - listener.onNoPeersAvailable(); - } - - retryAfterDelay(); - } + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting data. " + + "We will try requestDataFromPeers again."); + NodeAddress nextCandidate = remainingNodeAddresses.get(0); + remainingNodeAddresses.remove(nextCandidate); + requestData(nextCandidate, remainingNodeAddresses); } else { - log.warn("We have stopped already. We ignore that requestData.onFault call."); + log.info("There is no remaining node available for requesting data. " + + "That is expected if no other node is online.\n\t" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request data from our seed nodes after a random pause."); + + // Notify listeners + if (!nodeAddressOfPreliminaryDataRequest.isPresent()) { + if (peerManager.isSeedNode(nodeAddress)) + listener.onNoSeedNodeAvailable(); + else + listener.onNoPeersAvailable(); + } + + restart(); } } }); @@ -288,10 +283,13 @@ public class RequestDataManager implements MessageListener, ConnectionListener, // Utils /////////////////////////////////////////////////////////////////////////////////////////// - private void retryAfterDelay() { + private void restart() { + Log.traceCall(); if (retryTimer == null) { retryTimer = UserThread.runAfter(() -> { log.trace("retryTimer called"); + stopped = false; + stopRetryTimer(); // We create a new list of candidates @@ -345,19 +343,21 @@ public class RequestDataManager implements MessageListener, ConnectionListener, } } - private void closeRequestDataHandler(Connection connection) { + private void closeHandler(Connection connection) { Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); if (peersNodeAddressOptional.isPresent()) { NodeAddress nodeAddress = peersNodeAddressOptional.get(); if (handlerMap.containsKey(nodeAddress)) { - handlerMap.get(nodeAddress).cleanup(); + handlerMap.get(nodeAddress).cancel(); handlerMap.remove(nodeAddress); } + } else { + log.trace("closeRequestDataHandler: nodeAddress not set in connection " + connection); } } - private void closeAllRequestDataHandlers() { - handlerMap.values().stream().forEach(RequestDataHandler::cleanup); + private void closeAllHandlers() { + handlerMap.values().stream().forEach(RequestDataHandler::cancel); handlerMap.clear(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataRequest.java index 97916a282b..92c0a77336 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataRequest.java @@ -3,5 +3,5 @@ package io.bitsquare.p2p.peers.getdata.messages; import io.bitsquare.p2p.Message; public interface GetDataRequest extends Message { - long getNonce(); + int getNonce(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataResponse.java index 563667ca8d..4b21828ffd 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetDataResponse.java @@ -12,9 +12,9 @@ public final class GetDataResponse implements Message { private final int messageVersion = Version.getP2PMessageVersion(); public final HashSet dataSet; - public final long requestNonce; + public final int requestNonce; - public GetDataResponse(HashSet dataSet, long requestNonce) { + public GetDataResponse(HashSet dataSet, int requestNonce) { this.dataSet = dataSet; this.requestNonce = requestNonce; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetUpdatedDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetUpdatedDataRequest.java index 2acf0be790..220ba420cb 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetUpdatedDataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/GetUpdatedDataRequest.java @@ -10,15 +10,15 @@ public final class GetUpdatedDataRequest implements SendersNodeAddressMessage, G private final int messageVersion = Version.getP2PMessageVersion(); private final NodeAddress senderNodeAddress; - private final long nonce; + private final int nonce; - public GetUpdatedDataRequest(NodeAddress senderNodeAddress, long nonce) { + public GetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce) { this.senderNodeAddress = senderNodeAddress; this.nonce = nonce; } @Override - public long getNonce() { + public int getNonce() { return nonce; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/PreliminaryGetDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/PreliminaryGetDataRequest.java index cbd6ad811f..7e2885c72f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/PreliminaryGetDataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/messages/PreliminaryGetDataRequest.java @@ -8,14 +8,14 @@ public final class PreliminaryGetDataRequest implements AnonymousMessage, GetDat private static final long serialVersionUID = Version.P2P_NETWORK_VERSION; private final int messageVersion = Version.getP2PMessageVersion(); - private final long nonce; + private final int nonce; - public PreliminaryGetDataRequest(long nonce) { + public PreliminaryGetDataRequest(int nonce) { this.nonce = nonce; } @Override - public long getNonce() { + public int getNonce() { return nonce; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java index 9f28065dfd..47d2a07b02 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveHandler.java @@ -57,10 +57,8 @@ class KeepAliveHandler implements MessageListener { this.listener = listener; } - public void cleanup() { - stopped = true; - if (connection != null) - connection.removeMessageListener(this); + public void cancel() { + cleanup(); } @@ -83,13 +81,18 @@ class KeepAliveHandler implements MessageListener { @Override public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending ping to " + connection + - " failed. That is expected if the peer is offline.\n\tping=" + ping + - ".\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - cleanup(); - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); - listener.onFault(errorMessage); + if (!stopped) { + String errorMessage = "Sending ping to " + connection + + " failed. That is expected if the peer is offline.\n\tping=" + ping + + ".\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + cleanup(); + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + peerManager.handleConnectionFault(connection); + listener.onFault(errorMessage); + } else { + log.warn("We have stopped already. We ignore that sendPing.onFailure call."); + } } }); } else { @@ -121,4 +124,10 @@ class KeepAliveHandler implements MessageListener { } } } + + private void cleanup() { + stopped = true; + if (connection != null) + connection.removeMessageListener(this); + } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java index 113cc1349c..03dbd2318e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/keepalive/KeepAliveManager.java @@ -17,14 +17,13 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.Random; public class KeepAliveManager implements MessageListener, ConnectionListener, PeerManager.Listener { private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class); - private static final int INTERVAL_SEC = new Random().nextInt(10) + 10; + //private static final int INTERVAL_SEC = new Random().nextInt(10) + 10; //TODO - // private static final int INTERVAL_SEC = 5; + private static final int INTERVAL_SEC = 5; private final NetworkNode networkNode; private final PeerManager peerManager; @@ -49,13 +48,10 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe public void shutDown() { Log.traceCall(); stopped = true; - networkNode.removeMessageListener(this); networkNode.removeConnectionListener(this); peerManager.removeListener(this); - - closeAllMaintenanceHandlers(); - + closeAllHandlers(); stopKeepAliveTimer(); } @@ -64,10 +60,14 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe // API /////////////////////////////////////////////////////////////////////////////////////////// - public void start() { - stopped = false; + public void restart() { if (keepAliveTimer == null) - keepAliveTimer = UserThread.runPeriodically(this::keepAlive, INTERVAL_SEC); + keepAliveTimer = UserThread.runPeriodically(() -> { + stopped = false; + keepAlive(); + }, INTERVAL_SEC); + else + log.warn("keepAliveTimer already running"); } @@ -91,12 +91,16 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe @Override public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending pong to " + connection + - " failed. That is expected if the peer is offline. pong=" + pong + "." + - "Exception: " + throwable.getMessage(); - log.info(errorMessage); - peerManager.handleConnectionFault(connection); - peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + if (!stopped) { + String errorMessage = "Sending pong to " + connection + + " failed. That is expected if the peer is offline. pong=" + pong + "." + + "Exception: " + throwable.getMessage(); + log.info(errorMessage); + peerManager.handleConnectionFault(connection); + peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); + } else { + log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call."); + } } }); } else { @@ -113,14 +117,12 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe @Override public void onConnection(Connection connection) { Log.traceCall(); - // clean up in case we could not clean up at disconnect - closeMaintenanceHandler(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { Log.traceCall(); - closeMaintenanceHandler(connection); + closeHandler(connection); } @Override @@ -135,23 +137,27 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe @Override public void onAllConnectionsLost() { Log.traceCall(); - closeAllMaintenanceHandlers(); + closeAllHandlers(); stopKeepAliveTimer(); + stopped = true; + restart(); } @Override public void onNewConnectionAfterAllConnectionsLost() { Log.traceCall(); - closeAllMaintenanceHandlers(); - start(); + closeAllHandlers(); + stopped = false; + restart(); } @Override public void onAwakeFromStandby() { Log.traceCall(); - closeAllMaintenanceHandlers(); + closeAllHandlers(); + stopped = false; if (!networkNode.getAllConnections().isEmpty()) - start(); + restart(); } @@ -204,16 +210,16 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe } } - private void closeMaintenanceHandler(Connection connection) { + private void closeHandler(Connection connection) { String uid = connection.getUid(); if (handlerMap.containsKey(uid)) { - handlerMap.get(uid).cleanup(); + handlerMap.get(uid).cancel(); handlerMap.remove(uid); } } - private void closeAllMaintenanceHandlers() { - handlerMap.values().stream().forEach(KeepAliveHandler::cleanup); + private void closeAllHandlers() { + handlerMap.values().stream().forEach(KeepAliveHandler::cancel); handlerMap.clear(); } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java index 9ab99fa1de..406ce5f457 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/GetPeersRequestHandler.java @@ -68,7 +68,7 @@ class GetPeersRequestHandler { checkArgument(connection.getPeersNodeAddressOptional().isPresent(), "The peers address must have been already set at the moment"); GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce, - peerManager.getConnectedPeersNonSeedNodes(connection.getPeersNodeAddressOptional().get())); + peerManager.getConnectedNonSeedNodeReportedPeers(connection.getPeersNodeAddressOptional().get())); SettableFuture future = networkNode.sendMessage(connection, getPeersResponse); Futures.addCallback(future, new FutureCallback() { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java index 49d28e1705..afd2c976ea 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeHandler.java @@ -65,15 +65,9 @@ class PeerExchangeHandler implements MessageListener { this.listener = listener; } - public void cleanup() { - stopped = true; - if (connection != null) - connection.removeMessageListener(this); - - if (timeoutTimer != null) { - timeoutTimer.stop(); - timeoutTimer = null; - } + public void cancel() { + Log.traceCall(); + cleanup(); } @@ -85,38 +79,50 @@ class PeerExchangeHandler implements MessageListener { Log.traceCall("nodeAddress=" + nodeAddress + " / this=" + this); if (!stopped) { if (networkNode.getNodeAddress() != null) { - GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedPeersNonSeedNodes(nodeAddress)); + GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedNonSeedNodeReportedPeers(nodeAddress)); SettableFuture future = networkNode.sendMessage(nodeAddress, getPeersRequest); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - if (!connection.getPeersNodeAddressOptional().isPresent()) { - connection.setPeersNodeAddress(nodeAddress); - //TODO remove setPeersNodeAddress if never needed - log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()"); + if (!stopped) { + if (!connection.getPeersNodeAddressOptional().isPresent()) { + connection.setPeersNodeAddress(nodeAddress); + //TODO remove setPeersNodeAddress if never needed + log.warn("sendGetPeersRequest: !connection.getPeersNodeAddressOptional().isPresent()"); + } + PeerExchangeHandler.this.connection = connection; + connection.addMessageListener(PeerExchangeHandler.this); + log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded."); + } else { + log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest.onSuccess call."); } - PeerExchangeHandler.this.connection = connection; - connection.addMessageListener(PeerExchangeHandler.this); - log.trace("Send " + getPeersRequest + " to " + nodeAddress + " succeeded."); } @Override public void onFailure(@NotNull Throwable throwable) { - String errorMessage = "Sending getPeersRequest to " + nodeAddress + - " failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest + - ".\n\tException=" + throwable.getMessage(); - log.info(errorMessage); - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress); + if (!stopped) { + String errorMessage = "Sending getPeersRequest to " + nodeAddress + + " failed. That is expected if the peer is offline.\n\tgetPeersRequest=" + getPeersRequest + + ".\n\tException=" + throwable.getMessage(); + log.info(errorMessage); + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, nodeAddress); + } else { + log.warn("We have stopped that handler already. We ignore that sendGetPeersRequest.onFailure call."); + } } }); checkArgument(timeoutTimer == null, "requestReportedPeers must not be called twice."); timeoutTimer = UserThread.runAfter(() -> { - String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress; - log.info(errorMessage + " / PeerExchangeHandler=" + - PeerExchangeHandler.this); - log.info("timeoutTimer called on " + this); - handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress); + if (!stopped) { + String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress; + log.info(errorMessage + " / PeerExchangeHandler=" + + PeerExchangeHandler.this); + log.info("timeoutTimer called on " + this); + handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress); + } else { + log.warn("We have stopped that handler already. We ignore that timeoutTimer.run call."); + } }, TIME_OUT_SEC, TimeUnit.SECONDS); } else { @@ -162,6 +168,7 @@ class PeerExchangeHandler implements MessageListener { /////////////////////////////////////////////////////////////////////////////////////////// private void handleFault(String errorMessage, CloseConnectionReason sendMsgFailure, NodeAddress nodeAddress) { + Log.traceCall(); // TODO retry cleanup(); if (connection == null) @@ -169,6 +176,20 @@ class PeerExchangeHandler implements MessageListener { else peerManager.shutDownConnection(connection, sendMsgFailure); + peerManager.handleConnectionFault(nodeAddress, connection); listener.onFault(errorMessage, connection); } + + private void cleanup() { + Log.traceCall(); + stopped = true; + if (connection != null) + connection.removeMessageListener(this); + + if (timeoutTimer != null) { + timeoutTimer.stop(); + timeoutTimer = null; + } + } + } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java index a78aa38c30..d062c7f3c6 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/peerexchange/PeerExchangeManager.java @@ -59,7 +59,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, stopPeriodicTimer(); stopRetryTimer(); - closeAllPeerExchangeHandlers(); + closeAllHandlers(); } @@ -85,14 +85,12 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, @Override public void onConnection(Connection connection) { Log.traceCall(); - // clean up in case we could not clean up at disconnect - closePeerExchangeHandler(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { Log.traceCall(); - closePeerExchangeHandler(connection); + closeHandler(connection); if (retryTimer == null) { retryTimer = UserThread.runAfter(() -> { @@ -115,24 +113,26 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, @Override public void onAllConnectionsLost() { Log.traceCall(); - closeAllPeerExchangeHandlers(); + closeAllHandlers(); stopPeriodicTimer(); stopRetryTimer(); + stopped = true; + restart(); } @Override public void onNewConnectionAfterAllConnectionsLost() { Log.traceCall(); - closeAllPeerExchangeHandlers(); - + closeAllHandlers(); + stopped = false; restart(); } @Override public void onAwakeFromStandby() { Log.traceCall(); - closeAllPeerExchangeHandlers(); - + closeAllHandlers(); + stopped = false; if (!networkNode.getAllConnections().isEmpty()) restart(); } @@ -197,30 +197,32 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, "nodeAddress={}", errorMessage, nodeAddress); handlerMap.remove(nodeAddress); - peerManager.handleConnectionFault(nodeAddress, connection); - if (!stopped) { - if (!remainingNodeAddresses.isEmpty()) { - if (!peerManager.hasSufficientConnections()) { - log.info("There are remaining nodes available for requesting peers. " + - "We will try getReportedPeers again."); - NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nextCandidate); - requestReportedPeers(nextCandidate, remainingNodeAddresses); - } else { - // That path will rarely be reached - log.info("We have already sufficient connections."); - } + if (!remainingNodeAddresses.isEmpty()) { + if (!peerManager.hasSufficientConnections()) { + log.info("There are remaining nodes available for requesting peers. " + + "We will try getReportedPeers again."); + NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); + remainingNodeAddresses.remove(nextCandidate); + requestReportedPeers(nextCandidate, remainingNodeAddresses); } else { - log.info("There is no remaining node available for requesting peers. " + - "That is expected if no other node is online.\n\t" + - "We will try again after a pause."); - if (retryTimer == null) - retryTimer = UserThread.runAfter(() -> { - log.trace("ConnectToMorePeersTimer called from requestReportedPeers code path"); + // That path will rarely be reached + log.info("We have already sufficient connections."); + } + } else { + log.info("There is no remaining node available for requesting peers. " + + "That is expected if no other node is online.\n\t" + + "We will try again after a pause."); + if (retryTimer == null) + retryTimer = UserThread.runAfter(() -> { + if (!stopped) { + log.trace("retryTimer called from requestReportedPeers code path"); stopRetryTimer(); requestWithAvailablePeers(); - }, RETRY_DELAY_SEC); - } + } else { + stopRetryTimer(); + log.warn("We have stopped already. We ignore that retryTimer.run call."); + } + }, RETRY_DELAY_SEC); } } }); @@ -266,9 +268,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, log.info("No more peers are available for requestReportedPeers. We will try again after a pause."); if (retryTimer == null) retryTimer = UserThread.runAfter(() -> { - log.trace("ConnectToMorePeersTimer called from requestWithAvailablePeers code path"); - stopRetryTimer(); - requestWithAvailablePeers(); + if (!stopped) { + log.trace("retryTimer called from requestWithAvailablePeers code path"); + stopRetryTimer(); + requestWithAvailablePeers(); + } else { + stopRetryTimer(); + log.warn("We have stopped already. We ignore that retryTimer.run call."); + } }, RETRY_DELAY_SEC); } } else { @@ -289,6 +296,8 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, if (periodicTimer == null) periodicTimer = UserThread.runPeriodically(this::requestWithAvailablePeers, REQUEST_PERIODICALLY_INTERVAL_MINUTES, TimeUnit.MINUTES); + else + log.warn("periodicTimer already started"); } private void restart() { @@ -296,10 +305,13 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, if (retryTimer == null) { retryTimer = UserThread.runAfter(() -> { - log.trace("ConnectToMorePeersTimer called from onNewConnectionAfterAllConnectionsLost"); + stopped = false; + log.trace("retryTimer called from restart"); stopRetryTimer(); requestWithAvailablePeers(); }, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC); + } else { + log.warn("retryTimer already started"); } } @@ -338,19 +350,23 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener, } } - private void closePeerExchangeHandler(Connection connection) { + private void closeHandler(Connection connection) { + Log.traceCall(); Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); if (peersNodeAddressOptional.isPresent()) { NodeAddress nodeAddress = peersNodeAddressOptional.get(); if (handlerMap.containsKey(nodeAddress)) { - handlerMap.get(nodeAddress).cleanup(); + handlerMap.get(nodeAddress).cancel(); handlerMap.remove(nodeAddress); } + } else { + log.warn("closeHandler: nodeAddress not set in connection " + connection); } } - private void closeAllPeerExchangeHandlers() { - handlerMap.values().stream().forEach(PeerExchangeHandler::cleanup); + private void closeAllHandlers() { + Log.traceCall(); + handlerMap.values().stream().forEach(PeerExchangeHandler::cancel); handlerMap.clear(); } } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index f2cad8e253..bf5a1588a5 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -1,15 +1,14 @@ package io.bitsquare.p2p.storage; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; import io.bitsquare.app.Log; import io.bitsquare.app.Version; +import io.bitsquare.common.Timer; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.Hash; import io.bitsquare.common.crypto.Sig; import io.bitsquare.common.persistance.Persistable; -import io.bitsquare.common.util.Utilities; import io.bitsquare.common.wire.Payload; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; @@ -30,7 +29,6 @@ import java.security.PublicKey; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; // Run in UserThread @@ -39,15 +37,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { @VisibleForTesting //public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); - public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.HOURS.toMillis(30); - // public static int CHECK_TTL_INTERVAL_MILLIS = (int) TimeUnit.SECONDS.toMillis(5);//TODO + public static int CHECK_TTL_INTERVAL_SEC = 5;//TODO private final Broadcaster broadcaster; private final Map map = new ConcurrentHashMap<>(); private final CopyOnWriteArraySet hashMapChangedListeners = new CopyOnWriteArraySet<>(); + private Timer removeExpiredEntriesTimer; private HashMap sequenceNumberMap = new HashMap<>(); private final Storage storage; - private final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -60,21 +57,25 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { networkNode.addConnectionListener(this); storage = new Storage<>(storageDir); - removeExpiredEntriesExecutor = Utilities.getScheduledThreadPoolExecutor("removeExpiredEntries", 1, 10, 5); HashMap persisted = storage.initAndGetPersisted("SequenceNumberMap"); if (persisted != null) sequenceNumberMap = getPurgedSequenceNumberMap(persisted); } + public void shutDown() { + if (removeExpiredEntriesTimer != null) + removeExpiredEntriesTimer.stop(); + } + public void onBootstrapComplete() { - removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(() -> { + removeExpiredEntriesTimer = UserThread.runPeriodically(() -> { log.trace("removeExpiredEntries"); // The moment when an object becomes expired will not be synchronous in the network and we could // get add messages after the object has expired. To avoid repeated additions of already expired // object when we get it sent from new peers, we don’t remove the sequence number from the map. // That way an ADD message for an already expired data will fail because the sequence number - // is equal and not larger. + // is equal and not larger as expected. Map temp = new HashMap<>(map); Set toRemoveSet = new HashSet<>(); temp.entrySet().stream() @@ -83,7 +84,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { ByteArray hashOfPayload = entry.getKey(); ProtectedData protectedData = map.get(hashOfPayload); toRemoveSet.add(protectedData); - log.error("remove protectedData:\n\t" + protectedData); + log.warn("We found an expired data entry. We remove the protectedData:\n\t" + protectedData); map.remove(hashOfPayload); }); @@ -93,8 +94,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (sequenceNumberMap.size() > 1000) sequenceNumberMap = getPurgedSequenceNumberMap(sequenceNumberMap); - - }), CHECK_TTL_INTERVAL_MILLIS, CHECK_TTL_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + }, CHECK_TTL_INTERVAL_SEC); } @@ -166,11 +166,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void shutDown() { - Log.traceCall(); - MoreExecutors.shutdownAndAwaitTermination(removeExpiredEntriesExecutor, 500, TimeUnit.MILLISECONDS); - } - public boolean add(ProtectedData protectedData, @Nullable NodeAddress sender) { return add(protectedData, sender, false); } @@ -190,13 +185,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (result) { map.put(hashOfPayload, protectedData); - // Republished data have a larger sequence number. We set the rePublish flag to enable broadcasting - // even we had the data with the old seq nr. already - if (sequenceNumberMap.containsKey(hashOfPayload) && - protectedData.sequenceNumber > sequenceNumberMap.get(hashOfPayload).sequenceNr) - - sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 5000); + sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis())); + storage.queueUpForSave(sequenceNumberMap, 100); + log.error("sequenceNumberMap queueUpForSave protectedData.sequenceNumber " + protectedData.sequenceNumber); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); sb.append("Data set after doAdd (truncated)"); @@ -230,7 +221,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (storedData.expirablePayload instanceof StoragePayload) { if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) { - log.warn("We got that message with that seq nr already from another peer. We ignore that message."); + log.trace("We got that message with that seq nr already from another peer. We ignore that message."); return true; } else { PublicKey ownerPubKey = ((StoragePayload) storedData.expirablePayload).getOwnerPubKey(); @@ -239,11 +230,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload); if (result) { - log.error("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100)); + log.info("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100)); storedData.refreshDate(); + storedData.updateSequenceNumber(sequenceNumber); + storedData.updateSignature(signature); sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 5000); + storage.queueUpForSave(sequenceNumberMap, 100); + log.error("sequenceNumberMap queueUpForSave sequenceNumber " + sequenceNumber); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); sb.append("Data set after refreshTTL (truncated)"); @@ -286,7 +280,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { broadcast(new RemoveDataMessage(protectedData), sender); sequenceNumberMap.put(hashOfPayload, new MapValue(protectedData.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 5000); + storage.queueUpForSave(sequenceNumberMap, 100); } else { log.debug("remove failed"); } @@ -311,7 +305,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender); sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxData.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 5000); + storage.queueUpForSave(sequenceNumberMap, 100); } else { log.debug("removeMailboxData failed"); } @@ -332,6 +326,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { else sequenceNumber = 0; + log.error("getProtectedData sequenceNumber " + sequenceNumber); + byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(payload, sequenceNumber)); byte[] signature = Sig.sign(ownerStoragePubKey.getPrivate(), hashOfDataAndSeqNr); return new ProtectedData(payload, payload.getTTL(), ownerStoragePubKey.getPublic(), sequenceNumber, signature); @@ -346,6 +342,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { else sequenceNumber = 0; + log.error("getRefreshTTLMessage sequenceNumber " + sequenceNumber); + byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNrPair(payload, sequenceNumber)); byte[] signature = Sig.sign(ownerStoragePubKey.getPrivate(), hashOfDataAndSeqNr); return new RefreshTTLMessage(hashOfDataAndSeqNr, signature, hashOfPayload.bytes, sequenceNumber); @@ -393,7 +391,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { if (sequenceNumberMap.containsKey(hashOfData)) { Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr; if (newSequenceNumber < storedSequenceNumber) { - log.warn("Sequence number is invalid. newSequenceNumber=" + log.warn("Sequence number is invalid. sequenceNumber = " + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber); return false; } else { diff --git a/network/src/main/java/io/bitsquare/p2p/storage/data/ProtectedData.java b/network/src/main/java/io/bitsquare/p2p/storage/data/ProtectedData.java index deef77da48..8cd54d8770 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/data/ProtectedData.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/data/ProtectedData.java @@ -19,8 +19,8 @@ public class ProtectedData implements Payload { transient public long ttl; public final PublicKey ownerPubKey; - public final int sequenceNumber; - public final byte[] signature; + public int sequenceNumber; + public byte[] signature; @VisibleForTesting transient public Date date; @@ -49,6 +49,14 @@ public class ProtectedData implements Payload { date = new Date(); } + public void updateSequenceNumber(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public void updateSignature(byte[] signature) { + this.signature = signature; + } + public boolean isExpired() { return (new Date().getTime() - date.getTime()) > ttl; } @@ -64,4 +72,5 @@ public class ProtectedData implements Payload { ", signature.hashCode()=" + Arrays.toString(signature).hashCode() + '}'; } + } diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index 1684027356..1415e2497c 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -56,7 +56,7 @@ public class ProtectedDataStorageTest { dir2.mkdir(); UserThread.setExecutor(Executors.newSingleThreadExecutor()); - P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS = 500; + P2PDataStorage.CHECK_TTL_INTERVAL_SEC = 500; keyRing1 = new KeyRing(new KeyStorage(dir1)); @@ -112,7 +112,7 @@ public class ProtectedDataStorageTest { // @Test public void testTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { - mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5); + mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); log.debug("data.date " + data.date); log.debug("data.date " + data.date.getTime()); @@ -120,11 +120,11 @@ public class ProtectedDataStorageTest { log.debug("test 1"); Assert.assertEquals(1, dataStorage1.getMap().size()); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 2"); Assert.assertEquals(1, dataStorage1.getMap().size()); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 2); log.debug("test 3 removed"); Assert.assertEquals(0, dataStorage1.getMap().size()); } @@ -162,31 +162,31 @@ public class ProtectedDataStorageTest { */ @Test public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { - mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 1.5); + mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedData data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); Assert.assertTrue(dataStorage1.add(data, null)); Assert.assertEquals(1, dataStorage1.getMap().size()); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 1"); Assert.assertEquals(1, dataStorage1.getMap().size()); RefreshTTLMessage refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1); Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null)); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 2"); Assert.assertEquals(1, dataStorage1.getMap().size()); refreshTTLMessage = dataStorage1.getRefreshTTLMessage(mockData, storageSignatureKeyPair1); Assert.assertTrue(dataStorage1.refreshTTL(refreshTTLMessage, null)); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 3"); Assert.assertEquals(1, dataStorage1.getMap().size()); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 4"); Assert.assertEquals(1, dataStorage1.getMap().size()); - Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_MILLIS * 2); + Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 2); log.debug("test 5 removed"); Assert.assertEquals(0, dataStorage1.getMap().size()); }