From 7c3732c0e5958bbf8b57d515754f0c6d3381fb8c Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Sat, 2 Jan 2016 22:04:30 +0100 Subject: [PATCH] P2P network improvements --- .../src/main/java/io/bitsquare/app/Log.java | 2 +- .../io/bitsquare/common/util/Utilities.java | 2 +- .../io/bitsquare/storage/FileManager.java | 12 +- .../java/io/bitsquare/storage/Storage.java | 21 +- .../arbitration/ArbitratorManager.java | 2 +- .../bitsquare/arbitration/DisputeManager.java | 4 +- .../java/io/bitsquare/btc/WalletService.java | 3 - .../trade/offer/OpenOfferManager.java | 6 +- .../java/io/bitsquare/user/Preferences.java | 31 +- .../src/main/java/io/bitsquare/user/User.java | 4 +- .../java/io/bitsquare/app/BitsquareApp.java | 15 +- .../java/io/bitsquare/gui/Navigation.java | 13 +- .../java/io/bitsquare/gui/main/MainView.java | 17 +- .../io/bitsquare/gui/main/MainViewModel.java | 68 ++- .../ArbitratorRegistrationViewModel.java | 2 +- .../createoffer/CreateOfferViewModel.java | 2 +- .../offer/offerbook/OfferBookViewModel.java | 2 +- .../openoffer/OpenOffersViewModel.java | 2 +- .../pendingtrades/PendingTradesViewModel.java | 2 +- .../settings/network/NetworkSettingsView.java | 3 + .../p2p/FirstPeerAuthenticatedListener.java | 4 + .../java/io/bitsquare/p2p/P2PService.java | 100 ++-- .../io/bitsquare/p2p/P2PServiceListener.java | 2 + .../io/bitsquare/p2p/SeedNodeP2PService.java | 30 +- .../p2p/network/LocalhostNetworkNode.java | 1 - .../p2p/peers/AuthenticationHandshake.java | 13 +- .../p2p/peers/PeerExchangeManager.java | 4 +- .../io/bitsquare/p2p/peers/PeerManager.java | 547 +++++++++++------- .../io/bitsquare/p2p/peers/ReportedPeer.java | 2 + .../p2p/peers/RequestDataManager.java | 226 ++++++-- .../p2p/peers/SeedNodePeerManager.java | 65 ++- .../p2p/peers/SeedNodeRequestDataManager.java | 7 +- .../bitsquare/p2p/storage/P2PDataStorage.java | 6 +- .../test/java/io/bitsquare/p2p/TestUtils.java | 10 +- .../p2p/routing/PeerManagerTest.java | 15 + .../p2p/storage/ProtectedDataStorageTest.java | 2 +- 36 files changed, 831 insertions(+), 416 deletions(-) diff --git a/common/src/main/java/io/bitsquare/app/Log.java b/common/src/main/java/io/bitsquare/app/Log.java index cf81d2cd04..e2e6283ebc 100644 --- a/common/src/main/java/io/bitsquare/app/Log.java +++ b/common/src/main/java/io/bitsquare/app/Log.java @@ -48,7 +48,7 @@ public class Log { rollingPolicy.start(); triggeringPolicy = new SizeBasedTriggeringPolicy(); - triggeringPolicy.setMaxFileSize(useDetailedLogging ? "50MB" : "1MB"); + triggeringPolicy.setMaxFileSize(useDetailedLogging ? "10MB" : "1MB"); triggeringPolicy.start(); PatternLayoutEncoder encoder = new PatternLayoutEncoder(); diff --git a/common/src/main/java/io/bitsquare/common/util/Utilities.java b/common/src/main/java/io/bitsquare/common/util/Utilities.java index 5129f19c1c..e02b75b352 100644 --- a/common/src/main/java/io/bitsquare/common/util/Utilities.java +++ b/common/src/main/java/io/bitsquare/common/util/Utilities.java @@ -92,7 +92,7 @@ public class Utilities { executor.allowCoreThreadTimeOut(true); executor.setMaximumPoolSize(maximumPoolSize); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - executor.setRejectedExecutionHandler((r, e) -> log.warn("RejectedExecutionHandler called")); + executor.setRejectedExecutionHandler((r, e) -> log.debug("RejectedExecutionHandler called")); return executor; } diff --git a/common/src/main/java/io/bitsquare/storage/FileManager.java b/common/src/main/java/io/bitsquare/storage/FileManager.java index d876ea9a18..01b8a4eb26 100644 --- a/common/src/main/java/io/bitsquare/storage/FileManager.java +++ b/common/src/main/java/io/bitsquare/storage/FileManager.java @@ -50,8 +50,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.google.common.base.Preconditions.checkNotNull; - /** * Borrowed from BitcoinJ WalletFiles * A class that handles atomic and optionally delayed writing of a file to disk. @@ -67,7 +65,6 @@ public class FileManager { private final ScheduledThreadPoolExecutor executor; private final AtomicBoolean savePending; private final long delay; - private final TimeUnit delayTimeUnit; private final Callable saveFileTask; private T serializable; @@ -76,7 +73,7 @@ public class FileManager { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public FileManager(File dir, File storageFile, long delay, TimeUnit delayTimeUnit) { + public FileManager(File dir, File storageFile, long delay) { this.dir = dir; this.storageFile = storageFile; @@ -85,7 +82,6 @@ public class FileManager { // File must only be accessed from the auto-save executor from now on, to avoid simultaneous access. savePending = new AtomicBoolean(); this.delay = delay; - this.delayTimeUnit = checkNotNull(delayTimeUnit); saveFileTask = () -> { Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000)); @@ -126,11 +122,15 @@ public class FileManager { * Queues up a save in the background. Useful for not very important wallet changes. */ public void saveLater(T serializable) { + saveLater(serializable, delay); + } + + public void saveLater(T serializable, long delayInMilli) { this.serializable = serializable; if (savePending.getAndSet(true)) return; // Already pending. - executor.schedule(saveFileTask, delay, delayTimeUnit); + executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS); } public synchronized T read(File file) { diff --git a/common/src/main/java/io/bitsquare/storage/Storage.java b/common/src/main/java/io/bitsquare/storage/Storage.java index 49f72d0bee..cd79725ab6 100644 --- a/common/src/main/java/io/bitsquare/storage/Storage.java +++ b/common/src/main/java/io/bitsquare/storage/Storage.java @@ -27,7 +27,6 @@ import javax.inject.Named; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkNotNull; @@ -82,7 +81,7 @@ public class Storage { public T initAndGetPersisted(String fileName) { this.fileName = fileName; storageFile = new File(dir, fileName); - fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS); + fileManager = new FileManager<>(dir, storageFile, 600); return getPersisted(); } @@ -97,7 +96,7 @@ public class Storage { this.serializable = serializable; this.fileName = fileName; storageFile = new File(dir, fileName); - fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS); + fileManager = new FileManager<>(dir, storageFile, 600); return getPersisted(); } @@ -106,6 +105,10 @@ public class Storage { queueUpForSave(serializable); } + public void queueUpForSave(long delayInMilli) { + queueUpForSave(serializable, delayInMilli); + } + // Save delayed and on a background thread public void queueUpForSave(T serializable) { if (serializable != null) { @@ -118,6 +121,18 @@ public class Storage { } } + public void queueUpForSave(T serializable, long delayInMilli) { + if (serializable != null) { + log.trace("save " + fileName); + checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write."); + + fileManager.saveLater(serializable, delayInMilli); + } else { + log.trace("queueUpForSave called but no serializable set"); + } + } + + public void remove(String fileName) { fileManager.removeFile(fileName); } diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java index 0b09ea114a..dc2529463b 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java @@ -121,7 +121,7 @@ public class ArbitratorManager { if (user.getRegisteredArbitrator() != null) { P2PService p2PService = arbitratorService.getP2PService(); - if (!p2PService.getFirstPeerAuthenticated()) { + if (!p2PService.isAuthenticated()) { firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() { @Override public void onFirstPeerAuthenticated() { diff --git a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java index 30d5ef2bdf..cb1cf1a87b 100644 --- a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java @@ -106,12 +106,12 @@ public class DisputeManager { p2PService.addDecryptedMailListener((decryptedMessageWithPubKey, senderAddress) -> { decryptedMailMessageWithPubKeys.add(decryptedMessageWithPubKey); - if (p2PService.getFirstPeerAuthenticated()) + if (p2PService.isAuthenticated()) applyMessages(); }); p2PService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> { decryptedMailboxMessageWithPubKeys.add(decryptedMessageWithPubKey); - if (p2PService.getFirstPeerAuthenticated()) + if (p2PService.isAuthenticated()) applyMessages(); }); diff --git a/core/src/main/java/io/bitsquare/btc/WalletService.java b/core/src/main/java/io/bitsquare/btc/WalletService.java index de2210f116..67a575cc42 100644 --- a/core/src/main/java/io/bitsquare/btc/WalletService.java +++ b/core/src/main/java/io/bitsquare/btc/WalletService.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Service; -import io.bitsquare.app.Log; import io.bitsquare.btc.listeners.AddressConfidenceListener; import io.bitsquare.btc.listeners.BalanceListener; import io.bitsquare.btc.listeners.TxConfidenceListener; @@ -537,14 +536,12 @@ public class WalletService { @Override protected void progress(double percentage, int blocksLeft, Date date) { - Log.traceCall("percentage=" + percentage); super.progress(percentage, blocksLeft, date); UserThread.execute(() -> this.percentage.set(percentage / 100d)); } @Override protected void doneDownload() { - Log.traceCall(); super.doneDownload(); UserThread.execute(() -> this.percentage.set(1d)); } diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 8bc940e29e..9aa241adb5 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -125,9 +125,9 @@ public class OpenOfferManager { // setupAnStartRePublishThread will re-publish at method call // Before the TTL is reached we re-publish our offers - // If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set TTL quite short and use re-publish as - // strategy. Offerers need to be online anyway. - if (!p2PService.getFirstPeerAuthenticated()) { + // If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set + // TTL quite short and use re-publish as strategy. Offerers need to be online anyway. + if (!p2PService.isAuthenticated()) { firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() { @Override public void onFirstPeerAuthenticated() { diff --git a/core/src/main/java/io/bitsquare/user/Preferences.java b/core/src/main/java/io/bitsquare/user/Preferences.java index 1b88d86650..623b8b9b97 100644 --- a/core/src/main/java/io/bitsquare/user/Preferences.java +++ b/core/src/main/java/io/bitsquare/user/Preferences.java @@ -88,7 +88,6 @@ public class Preferences implements Serializable { private boolean useAnimations = true; private boolean useEffects = true; private boolean displaySecurityDepositInfo = true; - private boolean useUPnP = true; private ArrayList tradeCurrencies; private BlockChainExplorer blockChainExplorerMainNet; private BlockChainExplorer blockChainExplorerTestNet; @@ -123,7 +122,6 @@ public class Preferences implements Serializable { setBtcDenomination(persisted.btcDenomination); setUseAnimations(persisted.useAnimations); setUseEffects(persisted.useEffects); - setUseUPnP(persisted.useUPnP); setTradeCurrencies(persisted.tradeCurrencies); tradeCurrencies = new ArrayList<>(tradeCurrenciesAsObservable); displaySecurityDepositInfo = persisted.getDisplaySecurityDepositInfo(); @@ -172,15 +170,15 @@ public class Preferences implements Serializable { // Use that to guarantee update of the serializable field and to make a storage update in case of a change btcDenominationProperty.addListener((ov) -> { btcDenomination = btcDenominationProperty.get(); - storage.queueUpForSave(); + storage.queueUpForSave(2000); }); useAnimationsProperty.addListener((ov) -> { useAnimations = useAnimationsProperty.get(); - storage.queueUpForSave(); + storage.queueUpForSave(2000); }); useEffectsProperty.addListener((ov) -> { useEffects = useEffectsProperty.get(); - storage.queueUpForSave(); + storage.queueUpForSave(2000); }); tradeCurrenciesAsObservable.addListener((Observable ov) -> { tradeCurrencies.clear(); @@ -191,7 +189,7 @@ public class Preferences implements Serializable { public void dontShowAgain(String id) { showAgainMap.put(id, false); - storage.queueUpForSave(); + storage.queueUpForSave(2000); } @@ -213,12 +211,7 @@ public class Preferences implements Serializable { public void setDisplaySecurityDepositInfo(boolean displaySecurityDepositInfo) { this.displaySecurityDepositInfo = displaySecurityDepositInfo; - storage.queueUpForSave(); - } - - public void setUseUPnP(boolean useUPnP) { - this.useUPnP = useUPnP; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public void setBitcoinNetwork(BitcoinNetwork bitcoinNetwork) { @@ -235,12 +228,12 @@ public class Preferences implements Serializable { private void setBlockChainExplorerTestNet(BlockChainExplorer blockChainExplorerTestNet) { this.blockChainExplorerTestNet = blockChainExplorerTestNet; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } private void setBlockChainExplorerMainNet(BlockChainExplorer blockChainExplorerMainNet) { this.blockChainExplorerMainNet = blockChainExplorerMainNet; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public void setBlockChainExplorer(BlockChainExplorer blockChainExplorer) { @@ -252,12 +245,12 @@ public class Preferences implements Serializable { public void setShowPlaceOfferConfirmation(boolean showPlaceOfferConfirmation) { this.showPlaceOfferConfirmation = showPlaceOfferConfirmation; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public void setShowTakeOfferConfirmation(boolean showTakeOfferConfirmation) { this.showTakeOfferConfirmation = showTakeOfferConfirmation; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public void setTacAccepted(boolean tacAccepted) { @@ -310,10 +303,6 @@ public class Preferences implements Serializable { return useEffectsProperty; } - public boolean getUseUPnP() { - return useUPnP; - } - public BitcoinNetwork getBitcoinNetwork() { return bitcoinNetwork; } @@ -379,7 +368,7 @@ public class Preferences implements Serializable { // if we add new and those are not in our stored map we display by default the new popup if (!getShowAgainMap().containsKey(key)) { showAgainMap.put(key, true); - storage.queueUpForSave(); + storage.queueUpForSave(2000); } return showAgainMap.get(key); diff --git a/core/src/main/java/io/bitsquare/user/User.java b/core/src/main/java/io/bitsquare/user/User.java index e65ff345c2..16eec24b01 100644 --- a/core/src/main/java/io/bitsquare/user/User.java +++ b/core/src/main/java/io/bitsquare/user/User.java @@ -318,7 +318,7 @@ public class User implements Serializable { public void setDevelopersAlert(Alert developersAlert) { this.developersAlert = developersAlert; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public Alert getDevelopersAlert() { @@ -327,7 +327,7 @@ public class User implements Serializable { public void setDisplayedAlert(Alert displayedAlert) { this.displayedAlert = displayedAlert; - storage.queueUpForSave(); + storage.queueUpForSave(2000); } public Alert getDisplayedAlert() { diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java index 907d06423b..51c19082b5 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java @@ -32,6 +32,7 @@ import io.bitsquare.gui.common.view.View; import io.bitsquare.gui.common.view.ViewLoader; import io.bitsquare.gui.common.view.guice.InjectorViewFactory; import io.bitsquare.gui.main.MainView; +import io.bitsquare.gui.main.MainViewModel; import io.bitsquare.gui.main.debug.DebugView; import io.bitsquare.gui.popups.EmptyWalletPopup; import io.bitsquare.gui.popups.Popup; @@ -304,15 +305,13 @@ public class BitsquareApp extends Application { log.debug("gracefulShutDown"); try { if (injector != null) { - ArbitratorManager arbitratorManager = injector.getInstance(ArbitratorManager.class); - arbitratorManager.shutDown(); - OpenOfferManager openOfferManager = injector.getInstance(OpenOfferManager.class); - openOfferManager.shutDown(() -> { - P2PService p2PService = injector.getInstance(P2PService.class); - p2PService.shutDown(() -> { - WalletService walletService = injector.getInstance(WalletService.class); - walletService.shutDownDone.addListener((observable, oldValue, newValue) -> { + injector.getInstance(ArbitratorManager.class).shutDown(); + injector.getInstance(MainViewModel.class).shutDown(); + injector.getInstance(OpenOfferManager.class).shutDown(() -> { + injector.getInstance(P2PService.class).shutDown(() -> { + injector.getInstance(WalletService.class).shutDownDone.addListener((ov, o, n) -> { bitsquareAppModule.close(injector); + log.info("Graceful shutdown completed"); resultHandler.handleResult(); }); injector.getInstance(WalletService.class).shutDown(); diff --git a/gui/src/main/java/io/bitsquare/gui/Navigation.java b/gui/src/main/java/io/bitsquare/gui/Navigation.java index d2bc355d2d..9023c178d1 100644 --- a/gui/src/main/java/io/bitsquare/gui/Navigation.java +++ b/gui/src/main/java/io/bitsquare/gui/Navigation.java @@ -46,7 +46,7 @@ public class Navigation implements Serializable { // New listeners can be added during iteration so we use CopyOnWriteArrayList to // prevent invalid array modification transient private final CopyOnWriteArraySet listeners = new CopyOnWriteArraySet<>(); - transient private final Storage remoteStorage; + transient private final Storage storage; transient private ViewPath currentPath; // Used for returning to the last important view. After setup is done we want to // return to the last opened view (e.g. sell/buy) @@ -57,14 +57,13 @@ public class Navigation implements Serializable { @Inject - public Navigation(Storage remoteStorage) { - this.remoteStorage = remoteStorage; + public Navigation(Storage storage) { + this.storage = storage; - Navigation persisted = remoteStorage.initAndGetPersisted(this); + Navigation persisted = storage.initAndGetPersisted(this); if (persisted != null) { previousPath = persisted.getPreviousPath(); - } - else + } else previousPath = DEFAULT_VIEW_PATH; // need to be null initially and not DEFAULT_VIEW_PATH to navigate through all items @@ -102,7 +101,7 @@ public class Navigation implements Serializable { currentPath = newPath; previousPath = currentPath; - remoteStorage.queueUpForSave(); + storage.queueUpForSave(2000); listeners.stream().forEach((e) -> e.onNavigationRequested(currentPath)); } diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainView.java b/gui/src/main/java/io/bitsquare/gui/main/MainView.java index 2e625d2fbf..001551c0da 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainView.java @@ -87,6 +87,7 @@ public class MainView extends InitializableView { private Label btcSplashInfo; private List persistedFilesCorrupted; private static BorderPane baseApplicationContainer; + private Popup p2PNetworkWarnMsgPopup; @Inject public MainView(MainViewModel model, CachingViewLoader viewLoader, Navigation navigation, Transitions transitions, @@ -271,8 +272,10 @@ public class MainView extends InitializableView { splashP2PNetworkIndicator.progressProperty().bind(model.splashP2PNetworkProgress); splashP2PNetworkErrorMsgListener = (ov, oldValue, newValue) -> { - splashP2PNetworkLabel.setId("splash-error-state-msg"); - splashP2PNetworkIndicator.setVisible(false); + if (newValue != null) { + splashP2PNetworkLabel.setId("splash-error-state-msg"); + splashP2PNetworkIndicator.setVisible(false); + } }; model.p2PNetworkWarnMsg.addListener(splashP2PNetworkErrorMsgListener); @@ -392,14 +395,12 @@ public class MainView extends InitializableView { setRightAnchor(p2PNetworkIcon, 10d); setBottomAnchor(p2PNetworkIcon, 7d); p2PNetworkIcon.idProperty().bind(model.p2PNetworkIconId); - + p2PNetworkLabel.idProperty().bind(model.p2PNetworkLabelId); model.p2PNetworkWarnMsg.addListener((ov, oldValue, newValue) -> { if (newValue != null) { - p2PNetworkLabel.setId("splash-error-state-msg"); - new Popup().warning(newValue + "\nPlease check your internet connection or try to restart the application.") - .show(); - } else { - p2PNetworkLabel.setId("footer-pane"); + p2PNetworkWarnMsgPopup = new Popup().warning(newValue).show(); + } else if (p2PNetworkWarnMsgPopup != null) { + p2PNetworkWarnMsgPopup.hide(); } }); diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 4433435c4b..5f7511d6ea 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -47,6 +47,7 @@ import io.bitsquare.trade.offer.OpenOfferManager; import io.bitsquare.user.Preferences; import io.bitsquare.user.User; import javafx.beans.property.*; +import javafx.beans.value.ChangeListener; import javafx.collections.ListChangeListener; import org.bitcoinj.core.*; import org.bitcoinj.store.BlockStoreException; @@ -65,7 +66,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; -class MainViewModel implements ViewModel { +public class MainViewModel implements ViewModel { private static final Logger log = LoggerFactory.getLogger(MainViewModel.class); private final WalletService walletService; @@ -105,12 +106,13 @@ class MainViewModel implements ViewModel { final BooleanProperty showOpenDisputesNotification = new SimpleBooleanProperty(); private final BooleanProperty isSplashScreenRemoved = new SimpleBooleanProperty(); private final String btcNetworkAsString; - + final StringProperty p2PNetworkLabelId = new SimpleStringProperty("footer-pane"); private MonadicBinding allServicesDone; private User user; private int numBTCPeers = 0; private Timer checkForBtcSyncStateTimer; + private ChangeListener numAuthenticatedPeersListener, btcNumPeersListener; /////////////////////////////////////////////////////////////////////////////////////////// @@ -170,13 +172,24 @@ class MainViewModel implements ViewModel { }); } + public void shutDown() { + if (numAuthenticatedPeersListener != null) + p2PService.getNumAuthenticatedPeers().removeListener(numAuthenticatedPeersListener); + + if (btcNumPeersListener != null) + walletService.numPeersProperty().removeListener(btcNumPeersListener); + + if (checkForBtcSyncStateTimer != null) + checkForBtcSyncStateTimer.stop(); + + } /////////////////////////////////////////////////////////////////////////////////////////// // Initialisation /////////////////////////////////////////////////////////////////////////////////////////// private BooleanProperty initP2PNetwork() { - final BooleanProperty p2pNetWorkReady = new SimpleBooleanProperty(); + final BooleanProperty p2pNetworkInitialized = new SimpleBooleanProperty(); p2PNetworkInfo.set("Connecting to Tor network..."); p2PService.start(new P2PServiceListener() { @Override @@ -197,14 +210,26 @@ class MainViewModel implements ViewModel { } else { updateP2pNetworkInfoWithPeersChanged(p2PService.getNumAuthenticatedPeers().get()); } - p2pNetWorkReady.set(true); + p2pNetworkInitialized.set(true); } @Override public void onNoSeedNodeAvailable() { - p2PNetworkWarnMsg.set("There are no seed nodes available."); - p2PNetworkInfo.set("No seed node available"); - p2pNetWorkReady.set(true); + if (p2PService.getNumAuthenticatedPeers().get() == 0) { + p2PNetworkInfo.set("No seed nodes available"); + } + p2pNetworkInitialized.set(true); + } + + @Override + public void onNoPeersAvailable() { + if (p2PService.getNumAuthenticatedPeers().get() == 0) { + p2PNetworkWarnMsg.set("There are no seed nodes or persisted peers available for requesting data.\n" + + "Please check your internet connection or try to restart the application."); + p2PNetworkInfo.set("No seed nodes and peers available"); + p2PNetworkLabelId.set("splash-error-state-msg"); + } + p2pNetworkInitialized.set(true); } @Override @@ -216,18 +241,22 @@ class MainViewModel implements ViewModel { @Override public void onSetupFailed(Throwable throwable) { - p2PNetworkWarnMsg.set("Connecting to the P2P network failed (reported error: " + throwable.getMessage() + ")."); + p2PNetworkWarnMsg.set("Connecting to the P2P network failed (reported error: " + + throwable.getMessage() + ").\n" + + "Please check your internet connection or try to restart the application."); splashP2PNetworkProgress.set(0); + if (p2PService.getNumAuthenticatedPeers().get() == 0) + p2PNetworkLabelId.set("splash-error-state-msg"); } }); - return p2pNetWorkReady; + return p2pNetworkInitialized; } private BooleanProperty initBitcoinWallet() { EasyBind.subscribe(walletService.downloadPercentageProperty(), newValue -> setBitcoinNetworkSyncProgress((double) newValue)); - walletService.numPeersProperty().addListener((observable, oldValue, newValue) -> { + btcNumPeersListener = (observable, oldValue, newValue) -> { if ((int) oldValue > 0 && (int) newValue == 0) walletServiceErrorMsg.set("You lost the connection to all bitcoin peers."); else if ((int) oldValue == 0 && (int) newValue > 0) @@ -235,7 +264,8 @@ class MainViewModel implements ViewModel { numBTCPeers = (int) newValue; setBitcoinNetworkSyncProgress(walletService.downloadPercentageProperty().get()); - }); + }; + walletService.numPeersProperty().addListener(btcNumPeersListener); final BooleanProperty walletInitialized = new SimpleBooleanProperty(); walletService.initialize(null, @@ -341,14 +371,19 @@ class MainViewModel implements ViewModel { .show(); // update nr of peers in footer - p2PService.getNumAuthenticatedPeers().addListener((observable, oldValue, newValue) -> { - if ((int) oldValue > 0 && (int) newValue == 0) - p2PNetworkWarnMsg.set("You lost the connection to all P2P network peers."); - else if ((int) oldValue == 0 && (int) newValue > 0) + numAuthenticatedPeersListener = (observable, oldValue, newValue) -> { + if ((int) oldValue > 0 && (int) newValue == 0) { + p2PNetworkWarnMsg.set("You lost the connection to all P2P network peers.\n" + + "Please check your internet connection or try to restart the application."); + p2PNetworkLabelId.set("splash-error-state-msg"); + } else if ((int) oldValue == 0 && (int) newValue > 0) { p2PNetworkWarnMsg.set(null); + p2PNetworkLabelId.set("footer-pane"); + } updateP2pNetworkInfoWithPeersChanged((int) newValue); - }); + }; + p2PService.getNumAuthenticatedPeers().addListener(numAuthenticatedPeersListener); // now show app showAppScreen.set(true); @@ -565,7 +600,6 @@ class MainViewModel implements ViewModel { } private void setBitcoinNetworkSyncProgress(double value) { - Log.traceCall("btcSyncProgress=" + value); btcSyncProgress.set(value); String numPeers = "Nr. of peers: " + numBTCPeers; if (value == 1) { diff --git a/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/ArbitratorRegistrationViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/ArbitratorRegistrationViewModel.java index 56b79606ed..4890571811 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/ArbitratorRegistrationViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/ArbitratorRegistrationViewModel.java @@ -185,6 +185,6 @@ class ArbitratorRegistrationViewModel extends ActivatableViewModel { } boolean isAuthenticated() { - return p2PService.getFirstPeerAuthenticated(); + return p2PService.isAuthenticated(); } } diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferViewModel.java index 8859e90c2f..99d3d6d3aa 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/createoffer/CreateOfferViewModel.java @@ -486,7 +486,7 @@ class CreateOfferViewModel extends ActivatableWithDataModel } boolean isAuthenticated() { - return p2PService.getFirstPeerAuthenticated(); + return p2PService.isAuthenticated(); } } diff --git a/gui/src/main/java/io/bitsquare/gui/main/portfolio/pendingtrades/PendingTradesViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/portfolio/pendingtrades/PendingTradesViewModel.java index a5f2653c37..f655eb829d 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/portfolio/pendingtrades/PendingTradesViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/portfolio/pendingtrades/PendingTradesViewModel.java @@ -220,7 +220,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel optionalEncryptionService; private final Optional optionalKeyRing; @@ -63,16 +64,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final Map mailboxMap = new HashMap<>(); private final Set
authenticatedPeerAddresses = new HashSet<>(); private final CopyOnWriteArraySet shutDownResultHandlers = new CopyOnWriteArraySet<>(); - private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); - protected final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty(); - private final BooleanProperty firstPeerAuthenticated = new SimpleBooleanProperty(); + protected final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); + private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty(); + protected final BooleanProperty notAuthenticated = new SimpleBooleanProperty(true); private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0); - protected Address connectedSeedNode; + private Address seedNodeOfInitialDataRequest; private volatile boolean shutDownInProgress; private boolean shutDownComplete; @SuppressWarnings("FieldCanBeLocal") - private MonadicBinding readyForAuthentication; + private MonadicBinding readyForAuthenticationBinding; private final Storage
dbStorage; private Address myOnionAddress; protected RequestDataManager requestDataManager; @@ -97,6 +98,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis this.port = port; this.torDir = torDir; this.useLocalhost = useLocalhost; + this.storageDir = storageDir; optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService); optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing); @@ -122,7 +124,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis networkNode.addMessageListener(this); // peer group - peerManager = createPeerManager(); + peerManager = getNewPeerManager(); peerManager.setSeedNodeAddresses(seedNodeAddresses); peerManager.addAuthenticationListener(this); @@ -130,44 +132,52 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis dataStorage = new P2PDataStorage(peerManager, networkNode, storageDir); dataStorage.addHashMapChangedListener(this); - // Request initial data manager - requestDataManager = createRequestDataManager(); - peerManager.addAuthenticationListener(requestDataManager); - - // Test multiple states to check when we are ready for authenticateSeedNode - readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated, - (hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated) - -> hiddenServicePublished && requestingDataCompleted && !firstPeerAuthenticated); - readyForAuthentication.subscribe((observable, oldValue, newValue) -> { - // we need to have both the initial data delivered and the hidden service published before we - // authenticate to a seed node. - if (newValue) - authenticateSeedNode(); - }); - } - - protected PeerManager createPeerManager() { - return new PeerManager(networkNode); - } - - protected RequestDataManager createRequestDataManager() { - return new RequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager()); - } - - protected RequestDataManager.Listener getRequestDataManager() { - return new RequestDataManager.Listener() { + // Request data manager + requestDataManager = getNewRequestDataManager(); + requestDataManager.setRequestDataManagerListener(new RequestDataManager.Listener() { @Override public void onNoSeedNodeAvailable() { p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); } @Override - public void onDataReceived(Address seedNode) { - connectedSeedNode = seedNode; - requestingDataCompleted.set(true); + public void onNoPeersAvailable() { + p2pServiceListeners.stream().forEach(e -> e.onNoPeersAvailable()); + } + + @Override + public void onDataReceived(Address address) { + if (!requestingDataCompleted.get()) { + seedNodeOfInitialDataRequest = address; + requestingDataCompleted.set(true); + } p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted()); } - }; + }); + peerManager.addAuthenticationListener(requestDataManager); + + // Test multiple states to check when we are ready for authenticateSeedNode + // We need to have both the initial data delivered and the hidden service published before we + // authenticate to a seed node. + readyForAuthenticationBinding = getNewReadyForAuthenticationBinding(); + readyForAuthenticationBinding.subscribe((observable, oldValue, newValue) -> { + if (newValue) + authenticateToSeedNode(); + }); + } + + protected MonadicBinding getNewReadyForAuthenticationBinding() { + return EasyBind.combine(hiddenServicePublished, requestingDataCompleted, notAuthenticated, + (hiddenServicePublished, requestingDataCompleted, notAuthenticated) + -> hiddenServicePublished && requestingDataCompleted && notAuthenticated); + } + + protected PeerManager getNewPeerManager() { + return new PeerManager(networkNode, storageDir); + } + + protected RequestDataManager getNewRequestDataManager() { + return new RequestDataManager(networkNode, dataStorage, peerManager); } @@ -175,7 +185,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // API /////////////////////////////////////////////////////////////////////////////////////////// - public void start(@Nullable P2PServiceListener listener) { Log.traceCall(); if (listener != null) @@ -238,7 +247,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onTorNodeReady() { Log.traceCall(); - requestDataManager.requestData(seedNodeAddresses); + requestDataManager.requestDataFromSeedNodes(seedNodeAddresses); p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); } @@ -265,10 +274,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)); } - private void authenticateSeedNode() { + protected void authenticateToSeedNode() { Log.traceCall(); - checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null"); - peerManager.authenticateToSeedNode(connectedSeedNode); + checkNotNull(seedNodeOfInitialDataRequest != null, "seedNodeOfInitialDataRequest must not be null"); + peerManager.authenticateToSeedNode(seedNodeOfInitialDataRequest); } @@ -301,8 +310,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Log.traceCall(); authenticatedPeerAddresses.add(peerAddress); - if (!firstPeerAuthenticated.get()) { - firstPeerAuthenticated.set(true); + if (notAuthenticated.get()) { + notAuthenticated.set(false); p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); } @@ -658,8 +667,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // Getters /////////////////////////////////////////////////////////////////////////////////////////// - public boolean getFirstPeerAuthenticated() { - return firstPeerAuthenticated.get(); + public boolean isAuthenticated() { + return !notAuthenticated.get(); } public NetworkNode getNetworkNode() { @@ -678,6 +687,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis return authenticatedPeerAddresses; } + @NotNull public ReadOnlyIntegerProperty getNumAuthenticatedPeers() { return numAuthenticatedPeers; } diff --git a/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java b/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java index 57aab43964..bf5f9315cd 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java @@ -9,5 +9,7 @@ public interface P2PServiceListener extends SetupListener { void onNoSeedNodeAvailable(); + void onNoPeersAvailable(); + void onFirstPeerAuthenticated(); } diff --git a/network/src/main/java/io/bitsquare/p2p/SeedNodeP2PService.java b/network/src/main/java/io/bitsquare/p2p/SeedNodeP2PService.java index eb46848d5b..b47dc7262f 100644 --- a/network/src/main/java/io/bitsquare/p2p/SeedNodeP2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/SeedNodeP2PService.java @@ -1,10 +1,13 @@ package io.bitsquare.p2p; +import io.bitsquare.app.Log; import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.RequestDataManager; import io.bitsquare.p2p.peers.SeedNodePeerManager; import io.bitsquare.p2p.peers.SeedNodeRequestDataManager; import io.bitsquare.p2p.seed.SeedNodesRepository; +import org.fxmisc.easybind.EasyBind; +import org.fxmisc.easybind.monadic.MonadicBinding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,12 +29,31 @@ public class SeedNodeP2PService extends P2PService { } @Override - protected PeerManager createPeerManager() { - return new SeedNodePeerManager(networkNode); + protected PeerManager getNewPeerManager() { + return new SeedNodePeerManager(networkNode, storageDir); } @Override - protected RequestDataManager createRequestDataManager() { - return new SeedNodeRequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager()); + protected RequestDataManager getNewRequestDataManager() { + return new SeedNodeRequestDataManager(networkNode, dataStorage, peerManager); } + + @Override + protected MonadicBinding getNewReadyForAuthenticationBinding() { + return EasyBind.combine(hiddenServicePublished, notAuthenticated, + (hiddenServicePublished, notAuthenticated) -> hiddenServicePublished && notAuthenticated); + } + + @Override + public void onTorNodeReady() { + Log.traceCall(); + p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); + } + + @Override + protected void authenticateToSeedNode() { + Log.traceCall(); + ((SeedNodePeerManager) peerManager).authenticateToSeedNode(); + } + } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index d80d45b557..6c74d4d149 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -83,7 +83,6 @@ public class LocalhostNetworkNode extends NetworkNode { @Override @Nullable public Address getAddress() { - Log.traceCall(); return address; } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index 9c7e04c82e..caebacd0d8 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -42,7 +42,7 @@ public class AuthenticationHandshake implements MessageListener { private long nonce = 0; private boolean stopped; private Optional> resultFutureOptional = Optional.empty(); - private Timer timeoutTimer; + private Timer timeoutTimer, shutDownTimer; /////////////////////////////////////////////////////////////////////////////////////////// @@ -239,7 +239,10 @@ public class AuthenticationHandshake implements MessageListener { "connection with his reported address to verify if his address is correct.", peerAddress); connection.shutDown(() -> { - UserThread.runAfter(() -> { + if (shutDownTimer != null) + shutDownTimer.cancel(); + + shutDownTimer = UserThread.runAfter(() -> { if (!stopped) { // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // inconsistent state @@ -268,6 +271,9 @@ public class AuthenticationHandshake implements MessageListener { } }); + if (timeoutTimer != null) + timeoutTimer.cancel(); + timeoutTimer = UserThread.runAfter(() -> failed(new AuthenticationException("Authentication of peer " + peerAddress + " failed because of a timeout. " + @@ -343,6 +349,9 @@ public class AuthenticationHandshake implements MessageListener { if (timeoutTimer != null) timeoutTimer.cancel(); + if (shutDownTimer != null) + shutDownTimer.cancel(); + networkNode.removeMessageListener(this); } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index 8c833a2e41..52cb3adcbd 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -116,7 +116,7 @@ public class PeerExchangeManager implements MessageListener { if (!connectedPeersList.isEmpty()) { Log.traceCall(); connectedPeersList.stream() - .forEach(e -> UserThread.runAfterRandomDelay(() -> { + .forEach(e -> { SettableFuture future = networkNode.sendMessage(e.connection, new GetPeersRequest(networkNode.getAddress(), new HashSet<>(authenticatedAndReportedPeersSupplier.get()))); Futures.addCallback(future, new FutureCallback() { @@ -131,7 +131,7 @@ public class PeerExchangeManager implements MessageListener { removePeerConsumer.accept(e.address); } }); - }, 3, 5, TimeUnit.SECONDS)); + }); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 2eb2576fec..114cd05056 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -2,22 +2,27 @@ package io.bitsquare.p2p.peers; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.peers.messages.auth.AuthenticationRejection; import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest; import io.bitsquare.p2p.storage.messages.DataBroadcastMessage; +import io.bitsquare.storage.Storage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,7 +36,7 @@ public class PeerManager implements MessageListener, ConnectionListener { // Static /////////////////////////////////////////////////////////////////////////////////////////// - private static int MAX_CONNECTIONS_LOW_PRIORITY; + protected static int MAX_CONNECTIONS_LOW_PRIORITY; private static int MAX_CONNECTIONS_NORMAL_PRIORITY; private static int MAX_CONNECTIONS_HIGH_PRIORITY; @@ -55,27 +60,28 @@ public class PeerManager implements MessageListener, ConnectionListener { private final NetworkNode networkNode; private final MaintenanceManager maintenanceManager; private final PeerExchangeManager peerExchangeManager; + protected final ScheduledThreadPoolExecutor checkSeedNodeConnectionExecutor; + private final Storage> dbStorage; private final CopyOnWriteArraySet authenticationListeners = new CopyOnWriteArraySet<>(); - private final Map authenticatedPeers = new HashMap<>(); - private final Set reportedPeers = new HashSet<>(); - private final Map authenticationHandshakes = new HashMap<>(); - private final List
remainingSeedNodes = new ArrayList<>(); - private Optional> seedNodeAddressesOptional = Optional.empty(); - private Timer connectToSeedNodeTimer; + protected final Map authenticatedPeers = new HashMap<>(); + private final HashSet reportedPeers = new HashSet<>(); + private final HashSet persistedPeers = new HashSet<>(); + protected final Map authenticationHandshakes = new HashMap<>(); + protected final List
remainingSeedNodes = new ArrayList<>(); + protected Optional> seedNodeAddressesOptional = Optional.empty(); + protected Timer authenticateToRemainingSeedNodeTimer, authenticateToRemainingReportedPeerTimer; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public PeerManager(NetworkNode networkNode) { + public PeerManager(NetworkNode networkNode, File storageDir) { Log.traceCall(); this.networkNode = networkNode; - - networkNode.addMessageListener(this); - networkNode.addConnectionListener(this); + dbStorage = new Storage<>(storageDir); maintenanceManager = new MaintenanceManager(networkNode, () -> getAuthenticatedPeers(), @@ -86,9 +92,21 @@ public class PeerManager implements MessageListener, ConnectionListener { address -> removePeer(address), (newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection)); - startConnectToSeedNodeTimer(); + checkSeedNodeConnectionExecutor = Utilities.getScheduledThreadPoolExecutor("checkSeedNodeConnection", 1, 10, 5); + init(); } + private void init() { + networkNode.addMessageListener(this); + networkNode.addConnectionListener(this); + + HashSet persistedPeers = dbStorage.initAndGetPersisted("persistedPeers"); + if (persistedPeers != null) { + log.info("We have persisted reported peers. " + + "\npersistedPeers=" + persistedPeers); + this.persistedPeers.addAll(persistedPeers); + } + } /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation @@ -107,13 +125,15 @@ public class PeerManager implements MessageListener, ConnectionListener { // if we are not in the authentication process // Connection shut down is an expected step in the authentication process. // If the disconnect happens on an authenticated peer we remove the peer. - if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress)) { + if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress)) removePeer(peerAddress); + if (!authenticationHandshakes.containsKey(peerAddress)) { log.info("We got a disconnect. " + "We will try again after a random pause to remaining reported peers."); - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 10, 20, TimeUnit.SECONDS); + if (authenticateToRemainingReportedPeerTimer == null) + authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); } }); } @@ -144,31 +164,29 @@ public class PeerManager implements MessageListener, ConnectionListener { log.info("Broadcast message to {} peers. Message: {}", authenticatedPeers.values().size(), message); authenticatedPeers.values().stream() .filter(e -> !e.address.equals(sender)) - .forEach(peer -> UserThread.runAfterRandomDelay(() -> { - // as we use a delay we need to check again if our peer is still in the authenticated list - if (authenticatedPeers.containsValue(peer)) { - final Address address = peer.address; - log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); - SettableFuture future = networkNode.sendMessage(address, message); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Broadcast failed. " + throwable.getMessage()); - UserThread.execute(() -> removePeer(address)); - } - }); - } else { - log.debug("Peer is not in our authenticated list anymore. " + - "That can happen as we use a delay in the loop for the broadcast. " + - "Peer.address={}", peer.address); + .forEach(peer -> { + if (authenticatedPeers.containsValue(peer)) { + final Address address = peer.address; + log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); + SettableFuture future = networkNode.sendMessage(address, message); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded."); } - }, - 10, 100, TimeUnit.MILLISECONDS)); + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Broadcast failed. " + throwable.getMessage()); + UserThread.execute(() -> removePeer(address)); + } + }); + } else { + log.debug("Peer is not in our authenticated list anymore. " + + "That can happen as we use a stream loop for the broadcast. " + + "Peer.address={}", peer.address); + } + }); } else { log.info("Message not broadcasted because we have no authenticated peers yet. " + "message = {}", message); @@ -183,8 +201,13 @@ public class PeerManager implements MessageListener, ConnectionListener { networkNode.removeMessageListener(this); networkNode.removeConnectionListener(this); - if (connectToSeedNodeTimer != null) - connectToSeedNodeTimer.cancel(); + if (authenticateToRemainingReportedPeerTimer != null) + authenticateToRemainingReportedPeerTimer.cancel(); + + if (authenticateToRemainingSeedNodeTimer != null) + authenticateToRemainingSeedNodeTimer.cancel(); + + MoreExecutors.shutdownAndAwaitTermination(checkSeedNodeConnectionExecutor, 500, TimeUnit.MILLISECONDS); } public void addAuthenticationListener(AuthenticationListener listener) { @@ -209,8 +232,6 @@ public class PeerManager implements MessageListener, ConnectionListener { if (!authenticationHandshakes.containsKey(peerAddress)) { log.info("We got an incoming AuthenticationRequest for the peerAddress {}. " + "We create an AuthenticationHandshake.", peerAddress); - log.trace("message={}", message); - log.trace("connection={}", connection); // We protect that connection from getting closed by maintenance cleanup... connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); authenticationHandshake = new AuthenticationHandshake(networkNode, @@ -225,7 +246,7 @@ public class PeerManager implements MessageListener, ConnectionListener { @Override public void onSuccess(Connection connection) { log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress); - addAuthenticatedPeer(connection, peerAddress); + handleAuthenticationSuccess(connection, peerAddress); } @Override @@ -248,7 +269,8 @@ public class PeerManager implements MessageListener, ConnectionListener { } } else { log.info("We got an incoming AuthenticationRequest but we are already authenticated to peer {}.\n" + - "That should not happen. We reject the request.", peerAddress); + "That should not happen. " + + "We reject the request.", peerAddress); rejectAuthenticationRequest(peerAddress); if (authenticationHandshakes.containsKey(peerAddress)) { @@ -270,149 +292,181 @@ public class PeerManager implements MessageListener, ConnectionListener { public void setSeedNodeAddresses(Set
seedNodeAddresses) { seedNodeAddressesOptional = Optional.of(seedNodeAddresses); + checkArgument(!seedNodeAddressesOptional.get().isEmpty(), + "seedNodeAddresses must not be empty"); } public void authenticateToSeedNode(Address peerAddress) { Log.traceCall(); + checkArgument(seedNodeAddressesOptional.isPresent(), "seedNodeAddresses must be set before calling authenticateToSeedNode"); remainingSeedNodes.remove(peerAddress); remainingSeedNodes.addAll(seedNodeAddressesOptional.get()); authenticateToFirstSeedNode(peerAddress); + + startCheckSeedNodeConnectionTask(); } - private void authenticateToFirstSeedNode(Address peerAddress) { + protected void authenticateToFirstSeedNode(Address peerAddress) { Log.traceCall(); - if (!enoughConnectionsForAuthReached()) { - + if (!enoughConnections()) { if (!authenticationHandshakes.containsKey(peerAddress)) { log.info("We try to authenticate to seed node {}.", peerAddress); authenticate(peerAddress, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.info("We got our first seed node authenticated. " + - "We try if there are reported peers available to authenticate."); - - addAuthenticatedPeer(connection, peerAddress); - authenticateToRemainingReportedPeer(); + "We try to authenticate to reported peers."); + handleAuthenticationSuccess(connection, peerAddress); + onFirstSeedNodeAuthenticated(); } @Override public void onFailure(@NotNull Throwable throwable) { log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." + - "\nThat is expected if seed nodes are offline." + + "\nThat is expected if seed node is offline." + "\nException:" + throwable.toString()); - handleAuthenticationFailure(peerAddress, throwable); - Optional
seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode(); if (seedNodeOptional.isPresent()) { - log.info("We try another random seed node for first authentication attempt."); + log.info("We try another random seed node for authenticateToFirstSeedNode."); authenticateToFirstSeedNode(seedNodeOptional.get()); } else { log.info("There are no seed nodes available for authentication. " + - "We try if there are reported peers available to authenticate."); + "We try to authenticate to reported peers."); authenticateToRemainingReportedPeer(); } } }); } else { - log.warn("We got the first seed node already in the authenticationHandshakes. " + - "That might happen when we received an AuthenticationRequest before we start authenticating. " + - "We will try after a random pause to authenticate to the reported peers."); - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 20, 30, TimeUnit.SECONDS); + log.info("We have already an open authenticationHandshakes for the first seed node. " + + "That can happen when we received an AuthenticationRequest before we start authenticating. " + + "We will try to authenticate to another seed node."); + authenticateToRemainingSeedNode(); } } else { - log.info("We have already enough connections."); + log.info("We have already enough connections (at authenticateToFirstSeedNode). " + + "That is very unlikely to happen but can be a theoretical case."); } } - private void authenticateToRemainingSeedNode() { + protected void onFirstSeedNodeAuthenticated() { + authenticateToRemainingReportedPeer(); + } + + protected void authenticateToRemainingSeedNode() { Log.traceCall(); - if (!enoughConnectionsForAuthReached()) { + if (authenticateToRemainingSeedNodeTimer != null) { + authenticateToRemainingSeedNodeTimer.cancel(); + authenticateToRemainingSeedNodeTimer = null; + } + + if (!enoughConnections()) { Optional
seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode(); if (seedNodeOptional.isPresent()) { Address peerAddress = seedNodeOptional.get(); - log.info("We try to authenticate to seed node {}.", peerAddress); - authenticate(peerAddress, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.info("We got a seed node authenticated. " + - "We try if there are more seed nodes available to authenticate."); + if (!authenticationHandshakes.containsKey(peerAddress)) { + log.info("We try to authenticate to a randomly selected seed node {}.", peerAddress); + authenticate(peerAddress, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.info("We got a seed node authenticated. " + + "We try to authenticate to reported peers."); - addAuthenticatedPeer(connection, peerAddress); - authenticateToRemainingSeedNode(); + handleAuthenticationSuccess(connection, peerAddress); + onRemainingSeedNodeAuthenticated(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingSeedNode." + + "\nThat is expected if the seed node is offline." + + "\nException:" + throwable.toString()); + + handleAuthenticationFailure(peerAddress, throwable); + + log.info("We try authenticateToRemainingSeedNode again."); + authenticateToRemainingSeedNode(); + } } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingSeedNode." + - "\nThat is expected if the seed node is offline." + - "\nException:" + throwable.toString()); - - handleAuthenticationFailure(peerAddress, throwable); - - log.info("We try another random seed node for authentication."); - authenticateToRemainingSeedNode(); - } - } - - ); - } else if (reportedPeersAvailable() && !(this instanceof SeedNodePeerManager)) { - authenticateToRemainingReportedPeer(); - } else { - log.info("We don't have seed nodes or reported peers available. " + - "We try again after a random pause with the seed nodes which failed or if " + - "none available with the reported peers."); - if (seedNodeAddressesOptional.isPresent()) { - resetRemainingSeedNodes(); - if (remainingSeedNodes.isEmpty() && !(this instanceof SeedNodePeerManager)) { - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 10, 20, TimeUnit.SECONDS); - } - } else if (!(this instanceof SeedNodePeerManager)) { - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 10, 20, TimeUnit.SECONDS); + ); } else { - UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(), - 30, 60, TimeUnit.SECONDS); + log.info("We have already an open authenticationHandshakes for the selected seed node. " + + "That can happen in race conditions when we received an AuthenticationRequest before " + + "we start authenticating. " + + "We will try to authenticate to another seed node."); + authenticateToRemainingSeedNode(); } + } else { + handleNoSeedNodesAvailableCase(); } } else { - log.info("We have already enough connections."); + log.info("We have already enough connections (at authenticateToRemainingSeedNode)."); } } - private void resetRemainingSeedNodes() { + protected void onRemainingSeedNodeAuthenticated() { + authenticateToRemainingReportedPeer(); + } + + protected void handleNoSeedNodesAvailableCase() { + Log.traceCall(); + if (reportedPeersAvailable()) { + authenticateToRemainingReportedPeer(); + } else { + log.info("We don't have seed nodes or reported peers available. " + + "We try again after a random pause with the seed nodes which failed or if " + + "none available with the reported peers."); + checkArgument(seedNodeAddressesOptional.isPresent(), "seedNodeAddresses must be present"); + resetRemainingSeedNodes(); + if (remainingSeedNodesAvailable()) { + if (authenticateToRemainingSeedNodeTimer == null) + authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(), + 10, 20, TimeUnit.SECONDS); + } else { + if (authenticateToRemainingReportedPeerTimer == null) + authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); + } + } + } + + protected void resetRemainingSeedNodes() { + Log.traceCall(); if (seedNodeAddressesOptional.isPresent()) { remainingSeedNodes.clear(); seedNodeAddressesOptional.get().stream() .filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e)) .forEach(e -> remainingSeedNodes.add(e)); + } else { + log.error("seedNodeAddressesOptional must be present"); } } - // We want to stay connected to at least one seed node from time to time to avoid to get isolated with a group of peers - private void startConnectToSeedNodeTimer() { - Log.traceCall(); - if (connectToSeedNodeTimer != null) - connectToSeedNodeTimer.cancel(); - - connectToSeedNodeTimer = UserThread.runAfterRandomDelay(() -> { - connectToSeedNode(); - startConnectToSeedNodeTimer(); - }, 10, 12, TimeUnit.MINUTES); + protected void startCheckSeedNodeConnectionTask() { + checkSeedNodeConnectionExecutor.schedule(() -> UserThread.execute(() + -> checkSeedNodeConnections()), 2, TimeUnit.MINUTES); } - private void connectToSeedNode() { - // remove enough connections first - if (getMyAddress() != null) { - checkIfConnectedPeersExceeds(MAX_CONNECTIONS_NORMAL_PRIORITY - 3); - UserThread.runAfter(() -> { - resetRemainingSeedNodes(); - authenticateToRemainingSeedNode(); - }, 500, TimeUnit.MILLISECONDS); + // We want to stay connected to at least one seed node to avoid to get isolated with a group of peers + // Also needed for cases when all seed nodes get restarted, so peers will connect to the seed nodes again from time + // to time and so keep the network connected. + protected void checkSeedNodeConnections() { + Log.traceCall(); + resetRemainingSeedNodes(); + if (!remainingSeedNodes.isEmpty()) { + log.info("We have remaining not connected seed node(s) available. " + + "We will call authenticateToRemainingSeedNode."); + // remove enough connections to be sure the authentication will succeed. I t might be that in the meantime + // we get other connection attempts, so remove 2 more than needed to have a bit of headroom. + checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY - remainingSeedNodes.size() - 2); + + if (authenticateToRemainingSeedNodeTimer == null) + authenticateToRemainingSeedNodeTimer = UserThread.runAfter(() -> authenticateToRemainingSeedNode(), + 500, TimeUnit.MILLISECONDS); + } else { + log.debug("There are no remainingSeedNodes available."); } } @@ -421,14 +475,19 @@ public class PeerManager implements MessageListener, ConnectionListener { // Authentication to reported peers /////////////////////////////////////////////////////////////////////////////////////////// - private void authenticateToRemainingReportedPeer() { + protected void authenticateToRemainingReportedPeer() { Log.traceCall(); - if (!enoughConnectionsForAuthReached()) { + + if (authenticateToRemainingReportedPeerTimer != null) { + authenticateToRemainingReportedPeerTimer.cancel(); + authenticateToRemainingReportedPeerTimer = null; + } + + if (!enoughConnections()) { if (reportedPeersAvailable()) { - Optional andRemoveNotAuthenticatingReportedPeer = getAndRemoveNotAuthenticatingReportedPeer(); - if (andRemoveNotAuthenticatingReportedPeer.isPresent()) { - Address peerAddress = andRemoveNotAuthenticatingReportedPeer.get().address; - removeFromReportedPeers(peerAddress); + Optional reportedPeer = getAndRemoveNotAuthenticatingReportedPeer(); + if (reportedPeer.isPresent()) { + Address peerAddress = reportedPeer.get().address; if (!authenticationHandshakes.containsKey(peerAddress)) { log.info("We try to authenticate to peer {}.", peerAddress); authenticate(peerAddress, new FutureCallback() { @@ -437,7 +496,7 @@ public class PeerManager implements MessageListener, ConnectionListener { log.info("We got a peer authenticated. " + "We try if there are more reported peers available to authenticate."); - addAuthenticatedPeer(connection, peerAddress); + handleAuthenticationSuccess(connection, peerAddress); authenticateToRemainingReportedPeer(); } @@ -454,31 +513,46 @@ public class PeerManager implements MessageListener, ConnectionListener { } }); } else { - log.warn("We got the selected peer in the authenticationHandshakes. That should not happen. " + - "We will try again after a short random pause."); - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 1, 2, TimeUnit.SECONDS); + log.warn("We got the selected peer in the authenticationHandshakes. " + + "That should not happen. We will try again after a random pause."); + if (authenticateToRemainingReportedPeerTimer == null) + authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); } } else { - log.info("We don't have a reported peers available (maybe one is authenticating already). " + - "We will try again after a random pause."); - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 10, 20, TimeUnit.SECONDS); + log.info("We don't have a reported peers available. " + + "That should not happen. We will try again after a random pause."); + if (authenticateToRemainingReportedPeerTimer == null) + authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); } - } else if (!remainingSeedNodes.isEmpty()) { + } else if (remainingSeedNodesAvailable()) { authenticateToRemainingSeedNode(); - } else if (remainingSeedNodes.isEmpty()) { - UserThread.runAfterRandomDelay(() -> { - resetRemainingSeedNodes(); - authenticateToRemainingSeedNode(); - }, - 10, 20, TimeUnit.SECONDS); - - } else { + } else if (!persistedPeers.isEmpty()) { log.info("We don't have seed nodes or reported peers available. " + - "We will try again after a random pause."); - UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), - 30, 40, TimeUnit.SECONDS); + "We will add 5 peers from our persistedReportedPeers to our reportedPeers list and " + + "try authenticateToRemainingReportedPeer again."); + + List list = new ArrayList<>(persistedPeers); + authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); + authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); + if (!list.isEmpty()) { + int toRemove = Math.min(list.size(), 5); + for (int i = 0; i < toRemove; i++) { + ReportedPeer reportedPeer = list.get(i); + persistedPeers.remove(reportedPeer); + reportedPeers.add(reportedPeer); + } + authenticateToRemainingReportedPeer(); + } + } else { + log.info("We don't have seed nodes, reported peers nor persistedReportedPeers available. " + + "We will reset the seed nodes and try authenticateToRemainingSeedNode again after a random pause."); + resetRemainingSeedNodes(); + if (authenticateToRemainingSeedNodeTimer == null) + authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() -> + authenticateToRemainingSeedNode(), + 10, 20, TimeUnit.SECONDS); } } else { log.info("We have already enough connections."); @@ -525,12 +599,13 @@ public class PeerManager implements MessageListener, ConnectionListener { } } else { log.info("We try to authenticate to peer {} for sending a private message.", peerAddress); + authenticate(peerAddress, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.info("We got a new peer for sending a private message authenticated."); - addAuthenticatedPeer(connection, peerAddress); + handleAuthenticationSuccess(connection, peerAddress); if (completeHandler != null) completeHandler.run(); } @@ -555,30 +630,23 @@ public class PeerManager implements MessageListener, ConnectionListener { private void authenticate(Address peerAddress, FutureCallback futureCallback) { Log.traceCall(peerAddress.getFullAddress()); - if (!authenticationHandshakes.containsKey(peerAddress)) { - log.info("We create an AuthenticationHandshake to authenticate to peer {}.", peerAddress); - AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, - getMyAddress(), - peerAddress, - () -> getAuthenticatedAndReportedPeers(), - (newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection) - ); - authenticationHandshakes.put(peerAddress, authenticationHandshake); - SettableFuture authenticationFuture = authenticationHandshake.requestAuthentication(); - Futures.addCallback(authenticationFuture, futureCallback); - } else { - log.warn("An authentication handshake is already created for that peerAddress ({}). That should not happen", peerAddress); - } + checkArgument(!authenticationHandshakes.containsKey(peerAddress), + "An authentication handshake is already created for that peerAddress (" + peerAddress + ")"); + log.info("We create an AuthenticationHandshake to authenticate to peer {}.", peerAddress); + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, + getMyAddress(), + peerAddress, + () -> getAuthenticatedAndReportedPeers(), + (newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection) + ); + authenticationHandshakes.put(peerAddress, authenticationHandshake); + SettableFuture authenticationFuture = authenticationHandshake.requestAuthentication(); + Futures.addCallback(authenticationFuture, futureCallback); } - private void addAuthenticatedPeer(Connection connection, Address peerAddress) { + private void handleAuthenticationSuccess(Connection connection, Address peerAddress) { Log.traceCall(peerAddress.getFullAddress()); - connection.setPeerAddress(peerAddress); - connection.setAuthenticated(); - - removeFromAuthenticationHandshakes(peerAddress); - log.info("\n\n############################################################\n" + "We are authenticated to:" + "\nconnection=" + connection.getUid() @@ -586,14 +654,17 @@ public class PeerManager implements MessageListener, ConnectionListener { + "\npeerAddress= " + peerAddress + "\n############################################################\n"); + removeFromAuthenticationHandshakes(peerAddress); + connection.setPeerAddress(peerAddress); + connection.setAuthenticated(); authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress)); - removeFromReportedPeers(peerAddress); - - if (!checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY)) - printAuthenticatedPeers(); - authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection)); + + printAuthenticatedPeers(); + + // We give a bit headroom to avoid dangling disconnect/connect + checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY + 2); } void handleAuthenticationFailure(@Nullable Address peerAddress, Throwable throwable) { @@ -609,6 +680,7 @@ public class PeerManager implements MessageListener, ConnectionListener { removeFromAuthenticationHandshakes(peerAddress); removeFromReportedPeers(peerAddress); removeFromAuthenticatedPeers(peerAddress); + removeFromPersistedPeers(peerAddress); } } @@ -616,27 +688,44 @@ public class PeerManager implements MessageListener, ConnectionListener { reportedPeers.remove(new ReportedPeer(peerAddress)); } - private void removeFromAuthenticationHandshakes(@Nullable Address peerAddress) { + private void removeFromAuthenticationHandshakes(Address peerAddress) { if (authenticationHandshakes.containsKey(peerAddress)) authenticationHandshakes.remove(peerAddress); } - private void removeFromAuthenticatedPeers(@Nullable Address peerAddress) { + private void removeFromAuthenticatedPeers(Address peerAddress) { if (authenticatedPeers.containsKey(peerAddress)) authenticatedPeers.remove(peerAddress); printAuthenticatedPeers(); } - private boolean enoughConnectionsForAuthReached() { - // We reduce the limit to avoid dangling connect/disconnect - return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY - 2; + private void removeFromPersistedPeers(Address peerAddress) { + ReportedPeer reportedPeer = new ReportedPeer(peerAddress); + if (persistedPeers.contains(reportedPeer)) { + persistedPeers.remove(reportedPeer); + dbStorage.queueUpForSave(persistedPeers, 5000); + } } - private boolean reportedPeersAvailable() { - return !reportedPeers.isEmpty(); + private boolean enoughConnections() { + return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY; } - private boolean checkIfConnectedPeersExceeds(int limit) { + protected boolean reportedPeersAvailable() { + List list = new ArrayList<>(reportedPeers); + authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); + authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); + return !list.isEmpty(); + } + + private boolean remainingSeedNodesAvailable() { + List
list = new ArrayList<>(remainingSeedNodes); + authenticationHandshakes.keySet().stream().forEach(e -> list.remove(e)); + authenticatedPeers.keySet().stream().forEach(e -> list.remove(e)); + return !list.isEmpty(); + } + + protected boolean checkIfConnectedPeersExceeds(int limit) { Log.traceCall(); int size = authenticatedPeers.size(); if (size > limit) { @@ -651,11 +740,11 @@ public class PeerManager implements MessageListener, ConnectionListener { log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections()); }*/ - // If we are a seed node we don't remove other seed nodes to keep the core network well connected + // We don't remove seed nodes to keep the core network well connected List authenticatedConnections = allConnections.stream() .filter(e -> e.isAuthenticated()) .filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE) - .filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) + .filter(e -> !isSeedNode(e)) .collect(Collectors.toList()); if (authenticatedConnections.size() == 0) { @@ -665,7 +754,7 @@ public class PeerManager implements MessageListener, ConnectionListener { authenticatedConnections = allConnections.stream() .filter(e -> e.isAuthenticated()) .filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE) - .filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) + .filter(e -> !isSeedNode(e)) .collect(Collectors.toList()); if (authenticatedConnections.size() == 0) { @@ -675,7 +764,7 @@ public class PeerManager implements MessageListener, ConnectionListener { authenticatedConnections = allConnections.stream() .filter(e -> e.isAuthenticated()) .filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST) - .filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) + .filter(e -> !isSeedNode(e)) .collect(Collectors.toList()); } } @@ -688,7 +777,7 @@ public class PeerManager implements MessageListener, ConnectionListener { Connection connection = authenticatedConnections.remove(0); log.info("We are going to shut down the oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(limit), 10, 50, TimeUnit.MILLISECONDS)); + connection.shutDown(() -> checkIfConnectedPeersExceeds(limit)); return true; } else { log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)"); @@ -700,8 +789,10 @@ public class PeerManager implements MessageListener, ConnectionListener { } } - private boolean isAuthConnectionSeedNode(Connection e) { - return e.getPeerAddressOptional().isPresent() && seedNodeAddressesOptional.isPresent() && seedNodeAddressesOptional.get().contains(e.getPeerAddressOptional().get()); + private boolean isSeedNode(Connection connection) { + return connection.getPeerAddressOptional().isPresent() + && seedNodeAddressesOptional.isPresent() + && seedNodeAddressesOptional.get().contains(connection.getPeerAddressOptional().get()); } @@ -724,6 +815,10 @@ public class PeerManager implements MessageListener, ConnectionListener { return authenticatedPeers; } + public HashSet getPersistedPeers() { + return persistedPeers; + } + public boolean isInAuthenticationProcess(Address address) { return authenticationHandshakes.containsKey(address); } @@ -763,6 +858,25 @@ public class PeerManager implements MessageListener, ConnectionListener { reportedPeers.addAll(adjustedReportedPeers); purgeReportedPeersIfExceeds(); + + // We add all adjustedReportedPeers to persistedReportedPeers but only save the 500 peers with the most + // recent lastActivityDate. + // ReportedPeers is changing when peers authenticate (remove) so we don't use that but + // the persistedReportedPeers set. + persistedPeers.addAll(adjustedReportedPeers); + // We add also our authenticated and authenticating peers + authenticatedPeers.keySet().forEach(e -> persistedPeers.add(new ReportedPeer(e, new Date()))); + authenticationHandshakes.keySet().forEach(e -> persistedPeers.add(new ReportedPeer(e, new Date()))); + + int toRemove = persistedPeers.size() - 500; + if (toRemove > 0) { + List list = new ArrayList<>(persistedPeers); + list.sort((o1, o2) -> o1.lastActivityDate.compareTo(o2.lastActivityDate)); + for (int i = 0; i < toRemove; i++) { + persistedPeers.remove(list.get(i)); + } + } + dbStorage.queueUpForSave(persistedPeers); } printReportedPeers(); @@ -800,30 +914,31 @@ public class PeerManager implements MessageListener, ConnectionListener { } private Optional getAndRemoveNotAuthenticatingReportedPeer() { - Optional reportedPeer = Optional.empty(); List list = new ArrayList<>(reportedPeers); authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); - if (!list.isEmpty()) - reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list)); - - return reportedPeer; + if (!list.isEmpty()) { + ReportedPeer reportedPeer = getAndRemoveRandomReportedPeer(list); + removeFromReportedPeers(reportedPeer.address); + return Optional.of(reportedPeer); + } else { + return Optional.empty(); + } } - private Address getAndRemoveRandomAddress(List
list) { + protected Address getAndRemoveRandomAddress(List
list) { checkArgument(!list.isEmpty(), "List must not be empty"); return list.remove(new Random().nextInt(list.size())); } private Optional
getAndRemoveNotAuthenticatingSeedNode() { - Optional
seedNode = Optional.empty(); authenticationHandshakes.keySet().stream().forEach(e -> remainingSeedNodes.remove(e)); authenticatedPeers.keySet().stream().forEach(e -> remainingSeedNodes.remove(e)); - if (!remainingSeedNodes.isEmpty()) - seedNode = Optional.of(getAndRemoveRandomAddress(remainingSeedNodes)); - - return seedNode; + if (remainingSeedNodesAvailable()) + return Optional.of(getAndRemoveRandomAddress(remainingSeedNodes)); + else + return Optional.empty(); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -836,18 +951,22 @@ public class PeerManager implements MessageListener, ConnectionListener { } private void printAuthenticatedPeers() { - StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + - "Authenticated peers for node " + getMyAddress() + ":"); - authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); - result.append("\n------------------------------------------------------------\n"); - log.info(result.toString()); + if (!authenticatedPeers.isEmpty()) { + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + + "Authenticated peers for node " + getMyAddress() + ":"); + authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); + result.append("\n------------------------------------------------------------\n"); + log.info(result.toString()); + } } private void printReportedPeers() { - StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + - "Reported peers for node " + getMyAddress() + ":"); - reportedPeers.stream().forEach(e -> result.append("\n").append(e)); - result.append("\n------------------------------------------------------------\n"); - log.info(result.toString()); + if (!reportedPeers.isEmpty()) { + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + + "Reported peers for node " + getMyAddress() + ":"); + reportedPeers.stream().forEach(e -> result.append("\n").append(e)); + result.append("\n------------------------------------------------------------\n"); + log.info(result.toString()); + } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java b/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java index e8d7cef25c..cc432e1e17 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java @@ -22,6 +22,7 @@ public class ReportedPeer implements Serializable { this(address, null); } + // We don't use the lastActivityDate for identity @Override public boolean equals(Object o) { if (this == o) return true; @@ -33,6 +34,7 @@ public class ReportedPeer implements Serializable { } + // We don't use the lastActivityDate for identity @Override public int hashCode() { return address != null ? address.hashCode() : 0; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index bc134a7c39..56c89c4ab5 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -22,8 +22,10 @@ import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; public class RequestDataManager implements MessageListener, AuthenticationListener { private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); @@ -36,6 +38,8 @@ public class RequestDataManager implements MessageListener, AuthenticationListen public interface Listener { void onNoSeedNodeAvailable(); + void onNoPeersAvailable(); + void onDataReceived(Address seedNode); } @@ -43,21 +47,24 @@ public class RequestDataManager implements MessageListener, AuthenticationListen private final NetworkNode networkNode; protected final P2PDataStorage dataStorage; private final PeerManager peerManager; - private final Listener listener; - + private final HashSet persistedPeers = new HashSet<>(); + private final HashSet remainingPersistedPeers = new HashSet<>(); + private Listener listener; private Optional
optionalConnectedSeedNodeAddress = Optional.empty(); - private Optional> optionalSeedNodeAddresses = Optional.empty(); + private Collection
seedNodeAddresses; + protected Timer requestDataFromAuthenticatedSeedNodeTimer; + private Timer requestDataTimer, requestDataWithPersistedPeersTimer; + private boolean doNotifyNoSeedNodeAvailableListener = true; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Listener listener) { + public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager) { this.networkNode = networkNode; this.dataStorage = dataStorage; this.peerManager = peerManager; - this.listener = listener; networkNode.addMessageListener(this); } @@ -66,6 +73,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen Log.traceCall(); networkNode.removeMessageListener(this); + + stopRequestDataTimer(); + stopRequestDataWithPersistedPeersTimer(); + stopRequestDataFromAuthenticatedSeedNodeTimer(); } @@ -73,54 +84,143 @@ public class RequestDataManager implements MessageListener, AuthenticationListen // API /////////////////////////////////////////////////////////////////////////////////////////// - public void requestData(Collection
seedNodeAddresses) { - if (!optionalSeedNodeAddresses.isPresent()) - optionalSeedNodeAddresses = Optional.of(seedNodeAddresses); + public void setRequestDataManagerListener(Listener listener) { + this.listener = listener; + } - Log.traceCall(seedNodeAddresses.toString()); - if (!seedNodeAddresses.isEmpty()) { - List
remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); - Collections.shuffle(remainingSeedNodeAddresses); - Address candidate = remainingSeedNodeAddresses.get(0); - if (!peerManager.isInAuthenticationProcess(candidate)) { - // We only remove it if it is not in the process of authentication - remainingSeedNodeAddresses.remove(0); - log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); + public void requestDataFromSeedNodes(Collection
seedNodeAddresses) { + checkNotNull(seedNodeAddresses, "requestDataFromSeedNodes: seedNodeAddresses must not be null."); + checkArgument(!seedNodeAddresses.isEmpty(), "requestDataFromSeedNodes: seedNodeAddresses must not be empty."); - SettableFuture future = networkNode.sendMessage(candidate, new DataRequest()); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.info("Send GetAllDataMessage to " + candidate + " succeeded."); - checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); - optionalConnectedSeedNodeAddress = Optional.of(candidate); + this.seedNodeAddresses = seedNodeAddresses; + requestData(seedNodeAddresses); + } + + private void requestData(Collection
addresses) { + Log.traceCall(addresses.toString()); + checkArgument(!addresses.isEmpty(), "requestData: addresses must not be empty."); + stopRequestDataTimer(); + List
remainingAddresses = new ArrayList<>(addresses); + Address candidate = remainingAddresses.get(new Random().nextInt(remainingAddresses.size())); + if (!peerManager.isInAuthenticationProcess(candidate)) { + // We only remove it if it is not in the process of authentication + remainingAddresses.remove(candidate); + log.info("We try to send a GetAllDataMessage request to node. " + candidate); + + SettableFuture future = networkNode.sendMessage(candidate, new DataRequest()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("Send GetAllDataMessage to " + candidate + " succeeded."); + checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); + optionalConnectedSeedNodeAddress = Optional.of(candidate); + + stopRequestDataTimer(); + stopRequestDataWithPersistedPeersTimer(); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Send GetAllDataMessage to " + candidate + " failed. " + + "That is expected if the node is offline. " + + "Exception:" + throwable.getMessage()); + + if (!remainingAddresses.isEmpty()) { + log.info("There are more seed nodes available for requesting data. " + + "We will try requestData again."); + + ReportedPeer reportedPeer = new ReportedPeer(candidate); + if (remainingPersistedPeers.contains(reportedPeer)) + remainingPersistedPeers.remove(reportedPeer); + + requestData(remainingAddresses); + } else { + log.info("There is no seed node available for requesting data. " + + "That is expected if no seed node is online.\n" + + "We will try again to request data from a seed node after a random pause."); + + requestDataWithPersistedPeers(candidate); + requestDataWithSeedNodeAddresses(); } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Send GetAllDataMessage to " + candidate + " failed. " + - "That is expected if the seed node is offline. " + - "Exception:" + throwable.getMessage()); - if (!remainingSeedNodeAddresses.isEmpty()) - log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses); - - requestData(remainingSeedNodeAddresses); - } - }); + } + }); + } else if (!remainingAddresses.isEmpty()) { + log.info("The node ({}) is in the process of authentication.\n" + + "We will try requestData again with the remaining addresses.", candidate); + remainingAddresses.remove(candidate); + if (!remainingAddresses.isEmpty()) { + requestData(remainingAddresses); } else { - log.info("The seed node ({}) is in the process of authentication.\n" + - "We will try again after a pause of 3-5 sec.", candidate); - listener.onNoSeedNodeAvailable(); - UserThread.runAfterRandomDelay(() -> requestData(remainingSeedNodeAddresses), - 3, 5, TimeUnit.SECONDS); + log.info("The node ({}) is in the process of authentication.\n" + + "There are no more remaining addresses available.\n" + + "We try requestData with the persistedPeers and after a pause with " + + "the seed nodes again.", candidate); + requestDataWithPersistedPeers(candidate); + requestDataWithSeedNodeAddresses(); } } else { - log.info("There is no seed node available for requesting data. " + - "That is expected if no seed node is online.\n" + - "We will try again after a pause of 10-20 sec."); + log.info("The node ({}) is in the process of authentication.\n" + + "There are no more remaining addresses available.\n" + + "We try requestData with the persistedPeers and after a pause with " + + "the seed nodes again.", candidate); + requestDataWithPersistedPeers(candidate); + requestDataWithSeedNodeAddresses(); + } + } + + private void requestDataWithSeedNodeAddresses() { + Log.traceCall(); + // We only want to notify the first time + if (doNotifyNoSeedNodeAvailableListener) { + doNotifyNoSeedNodeAvailableListener = false; listener.onNoSeedNodeAvailable(); - UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()), + } + if (requestDataTimer == null) + requestDataTimer = UserThread.runAfterRandomDelay(() -> requestData(seedNodeAddresses), 10, 20, TimeUnit.SECONDS); + } + + private void requestDataWithPersistedPeers(@Nullable Address failedPeer) { + Log.traceCall("failedPeer=" + failedPeer); + + stopRequestDataWithPersistedPeersTimer(); + + if (persistedPeers.isEmpty()) { + persistedPeers.addAll(peerManager.getPersistedPeers()); + log.info("persistedPeers = " + persistedPeers); + remainingPersistedPeers.addAll(persistedPeers); + } + + if (failedPeer != null) { + ReportedPeer reportedPeer = new ReportedPeer(failedPeer); + if (remainingPersistedPeers.contains(reportedPeer)) + remainingPersistedPeers.remove(reportedPeer); + } + + boolean persistedPeersAvailable = false; + if (!remainingPersistedPeers.isEmpty()) { + Set
persistedPeerAddresses = remainingPersistedPeers.stream().map(e -> e.address).collect(Collectors.toSet()); + if (!persistedPeerAddresses.isEmpty()) { + log.info("We try to use persisted peers for requestData."); + persistedPeersAvailable = true; + requestData(persistedPeerAddresses); + } + } + + if (!persistedPeersAvailable) { + log.warn("No seed nodes and no persisted peers are available for requesting data.\n" + + "We will try again after a random pause."); + doNotifyNoSeedNodeAvailableListener = false; + listener.onNoPeersAvailable(); + + // reset remainingPersistedPeers + remainingPersistedPeers.clear(); + remainingPersistedPeers.addAll(persistedPeers); + + if (!remainingPersistedPeers.isEmpty() && requestDataWithPersistedPeersTimer == null) + requestDataWithPersistedPeersTimer = UserThread.runAfterRandomDelay(() -> + requestDataWithPersistedPeers(null), + 30, 40, TimeUnit.SECONDS); } } @@ -159,8 +259,9 @@ public class RequestDataManager implements MessageListener, AuthenticationListen // We delay a bit to be sure that the authentication state is applied to all listeners if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) { // We are the node (can be a seed node as well) which requested the authentication - UserThread.runAfter(() - -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS); + if (requestDataFromAuthenticatedSeedNodeTimer == null) + requestDataFromAuthenticatedSeedNodeTimer = UserThread.runAfter(() + -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS); } }); } @@ -168,6 +269,9 @@ public class RequestDataManager implements MessageListener, AuthenticationListen // 5. Step after authentication to first seed node we request again the data protected void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) { Log.traceCall(peerAddress.toString()); + + stopRequestDataFromAuthenticatedSeedNodeTimer(); + // We have to request the data again as we might have missed pushed data in the meantime SettableFuture future = networkNode.sendMessage(connection, new DataRequest()); Futures.addCallback(future, new FutureCallback() { @@ -183,9 +287,33 @@ public class RequestDataManager implements MessageListener, AuthenticationListen + "\nWe will try again to request data from any of our seed nodes."); // We will try again to request data from any of our seed nodes. - if (optionalSeedNodeAddresses.isPresent()) - requestData(optionalSeedNodeAddresses.get()); + if (seedNodeAddresses != null && !seedNodeAddresses.isEmpty()) + requestData(seedNodeAddresses); + else + log.error("seedNodeAddresses is null or empty. That must not happen. seedNodeAddresses=" + + seedNodeAddresses); } }); } + + private void stopRequestDataTimer() { + if (requestDataTimer != null) { + requestDataTimer.cancel(); + requestDataTimer = null; + } + } + + private void stopRequestDataWithPersistedPeersTimer() { + if (requestDataWithPersistedPeersTimer != null) { + requestDataWithPersistedPeersTimer.cancel(); + requestDataWithPersistedPeersTimer = null; + } + } + + private void stopRequestDataFromAuthenticatedSeedNodeTimer() { + if (requestDataFromAuthenticatedSeedNodeTimer != null) { + requestDataFromAuthenticatedSeedNodeTimer.cancel(); + requestDataFromAuthenticatedSeedNodeTimer = null; + } + } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/SeedNodePeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/SeedNodePeerManager.java index c832c9da02..a20a5c8b06 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/SeedNodePeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/SeedNodePeerManager.java @@ -1,13 +1,74 @@ package io.bitsquare.p2p.peers; +import io.bitsquare.app.Log; +import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.NetworkNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; + public class SeedNodePeerManager extends PeerManager { private static final Logger log = LoggerFactory.getLogger(SeedNodePeerManager.class); - public SeedNodePeerManager(NetworkNode networkNode) { - super(networkNode); + public SeedNodePeerManager(NetworkNode networkNode, File storageDir) { + super(networkNode, storageDir); } + + public void authenticateToSeedNode() { + Log.traceCall(); + checkArgument(seedNodeAddressesOptional.isPresent(), + "seedNodeAddresses must be set before calling authenticateToSeedNode"); + checkArgument(!seedNodeAddressesOptional.get().isEmpty(), + "seedNodeAddresses must not be empty"); + remainingSeedNodes.addAll(seedNodeAddressesOptional.get()); + Address peerAddress = getAndRemoveRandomAddress(remainingSeedNodes); + authenticateToFirstSeedNode(peerAddress); + + startCheckSeedNodeConnectionTask(); + } + + + @Override + protected void onFirstSeedNodeAuthenticated() { + // If we are seed node we want to first connect to all other seed nodes before connecting to the reported peers. + authenticateToRemainingSeedNode(); + } + + @Override + protected void onRemainingSeedNodeAuthenticated() { + // If we are seed node we want to first connect to all other seed nodes before connecting to the reported peers. + authenticateToRemainingSeedNode(); + } + + @Override + protected void handleNoSeedNodesAvailableCase() { + Log.traceCall(); + log.info("We don't have more seed nodes available. " + + "We authenticate to reported peers and try again after a random pause with the seed nodes which failed or if " + + "none available with the reported peers."); + + boolean reportedPeersAvailableCalled = false; + if (reportedPeersAvailable()) { + authenticateToRemainingReportedPeer(); + reportedPeersAvailableCalled = true; + } + + resetRemainingSeedNodes(); + if (!remainingSeedNodes.isEmpty()) { + if (authenticateToRemainingSeedNodeTimer == null) + authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(), + 10, 20, TimeUnit.SECONDS); + } else if (!reportedPeersAvailableCalled) { + if (authenticateToRemainingReportedPeerTimer == null) + authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), + 10, 20, TimeUnit.SECONDS); + } + } + + } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/SeedNodeRequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/SeedNodeRequestDataManager.java index 7624ac02b5..396e59f308 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/SeedNodeRequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/SeedNodeRequestDataManager.java @@ -13,15 +13,16 @@ import java.util.concurrent.TimeUnit; public class SeedNodeRequestDataManager extends RequestDataManager { private static final Logger log = LoggerFactory.getLogger(SeedNodeRequestDataManager.class); - public SeedNodeRequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Listener listener) { - super(networkNode, dataStorage, peerManager, listener); + public SeedNodeRequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager) { + super(networkNode, dataStorage, peerManager); } @Override public void onPeerAuthenticated(Address peerAddress, Connection connection) { //TODO not clear which use case is handles here... if (dataStorage.getMap().isEmpty()) { - UserThread.runAfterRandomDelay(() + if (requestDataFromAuthenticatedSeedNodeTimer == null) + requestDataFromAuthenticatedSeedNodeTimer = UserThread.runAfterRandomDelay(() -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS); } super.onPeerAuthenticated(peerAddress, connection); diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 01288b08e4..6357c4a464 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -173,7 +173,7 @@ public class P2PDataStorage implements MessageListener { rePublish = true; sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); - storage.queueUpForSave(sequenceNumberMap); + storage.queueUpForSave(sequenceNumberMap, 5000); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); sb.append("Data set after addProtectedExpirableData:"); @@ -210,7 +210,7 @@ public class P2PDataStorage implements MessageListener { broadcast(new RemoveDataMessage(protectedData), sender); sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); - storage.queueUpForSave(sequenceNumberMap); + storage.queueUpForSave(sequenceNumberMap, 5000); } else { log.debug("remove failed"); } @@ -235,7 +235,7 @@ public class P2PDataStorage implements MessageListener { broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender); sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber); - storage.queueUpForSave(sequenceNumberMap); + storage.queueUpForSave(sequenceNumberMap, 5000); } else { log.debug("removeMailboxData failed"); } diff --git a/network/src/test/java/io/bitsquare/p2p/TestUtils.java b/network/src/test/java/io/bitsquare/p2p/TestUtils.java index 04edf18fbe..07e5ad7d1a 100644 --- a/network/src/test/java/io/bitsquare/p2p/TestUtils.java +++ b/network/src/test/java/io/bitsquare/p2p/TestUtils.java @@ -90,6 +90,10 @@ public class TestUtils { public void onNoSeedNodeAvailable() { } + @Override + public void onNoPeersAvailable() { + } + @Override public void onFirstPeerAuthenticated() { } @@ -136,8 +140,11 @@ public class TestUtils { } @Override - public void onTorNodeReady() { + public void onNoPeersAvailable() { + } + @Override + public void onTorNodeReady() { } @Override @@ -147,7 +154,6 @@ public class TestUtils { @Override public void onHiddenServicePublished() { - } @Override diff --git a/network/src/test/java/io/bitsquare/p2p/routing/PeerManagerTest.java b/network/src/test/java/io/bitsquare/p2p/routing/PeerManagerTest.java index cfd34bf95d..0362cf29fb 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/PeerManagerTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/PeerManagerTest.java @@ -95,6 +95,10 @@ public class PeerManagerTest { public void onNoSeedNodeAvailable() { } + @Override + public void onNoPeersAvailable() { + } + @Override public void onFirstPeerAuthenticated() { } @@ -140,7 +144,10 @@ public class PeerManagerTest { @Override public void onTorNodeReady() { + } + @Override + public void onNoPeersAvailable() { } @Override @@ -177,6 +184,10 @@ public class PeerManagerTest { public void onTorNodeReady() { } + @Override + public void onNoPeersAvailable() { + } + @Override public void onFirstPeerAuthenticated() { latch.countDown(); @@ -410,6 +421,10 @@ public class PeerManagerTest { public void onTorNodeReady() { } + @Override + public void onNoPeersAvailable() { + } + @Override public void onFirstPeerAuthenticated() { } diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index bb21a89181..b3ecc00f71 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -65,7 +65,7 @@ public class ProtectedDataStorageTest { storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair(); encryptionService1 = new EncryptionService(keyRing1); networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService().getNetworkNode(); - peerManager1 = new PeerManager(networkNode1); + peerManager1 = new PeerManager(networkNode1, new File("dummy")); dataStorage1 = new P2PDataStorage(peerManager1, networkNode1, new File("dummy")); // for mailbox