From 991a4350ac5c639a750779fa496e8bb3aefe71b8 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Tue, 26 Jan 2016 16:37:08 +0100 Subject: [PATCH] P2P network / UI improvements --- .../bitsquare/common/crypto/Encryption.java | 1 + .../btc/AddressBasedCoinSelector.java | 1 - .../java/io/bitsquare/app/BitsquareApp.java | 2 +- .../gui/components/NetworkSyncPane.java | 74 ----- .../io/bitsquare/gui/main/MainViewModel.java | 4 +- .../EnterPrivKeyPopup.java | 1 - .../funds/transactions/TransactionsView.java | 3 - .../main/funds/withdrawal/WithdrawalView.java | 6 +- .../main/offer/takeoffer/TakeOfferView.java | 7 - .../offer/takeoffer/TakeOfferViewModel.java | 31 +- .../io/bitsquare/gui/util/BSFormatter.java | 8 - ...dSignedMessage.java => DirectMessage.java} | 9 +- .../java/io/bitsquare/p2p/P2PService.java | 121 +++---- .../io/bitsquare/p2p/network/Connection.java | 203 +++++++----- .../bitsquare/p2p/network/IllegalRequest.java | 1 - .../p2p/network/InboundConnection.java | 9 + .../io/bitsquare/p2p/network/NetworkNode.java | 53 +-- .../p2p/network/OutboundConnection.java | 11 + .../java/io/bitsquare/p2p/network/Server.java | 2 +- .../network/messages/AnonymousMessage.java | 6 + .../messages/SendersNodeAddressMessage.java | 8 + .../io/bitsquare/p2p/peers/Broadcaster.java | 20 +- .../p2p/peers/PeerExchangeManager.java | 305 ++++++++---------- .../io/bitsquare/p2p/peers/PeerManager.java | 103 +++--- .../p2p/peers/RequestDataManager.java | 285 +++++++--------- .../p2p/peers/messages/data/DataRequest.java | 19 +- .../p2p/peers/messages/data/DataResponse.java | 14 +- .../messages/data/PreliminaryDataRequest.java | 26 ++ .../peers/messages/peers/GetPeersRequest.java | 13 +- .../storage/data/ExpirableMailboxPayload.java | 14 +- .../crypto/EncryptionServiceTests.java | 4 +- .../java/io/bitsquare/p2p/P2PServiceTest.java | 8 +- .../p2p/storage/ProtectedDataStorageTest.java | 8 +- 33 files changed, 651 insertions(+), 729 deletions(-) delete mode 100644 gui/src/main/java/io/bitsquare/gui/components/NetworkSyncPane.java rename network/src/main/java/io/bitsquare/crypto/{SealedAndSignedMessage.java => DirectMessage.java} (73%) create mode 100644 network/src/main/java/io/bitsquare/p2p/network/InboundConnection.java create mode 100644 network/src/main/java/io/bitsquare/p2p/network/OutboundConnection.java create mode 100644 network/src/main/java/io/bitsquare/p2p/network/messages/AnonymousMessage.java create mode 100644 network/src/main/java/io/bitsquare/p2p/network/messages/SendersNodeAddressMessage.java create mode 100644 network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java diff --git a/common/src/main/java/io/bitsquare/common/crypto/Encryption.java b/common/src/main/java/io/bitsquare/common/crypto/Encryption.java index 2f01359ec2..df4f81b759 100644 --- a/common/src/main/java/io/bitsquare/common/crypto/Encryption.java +++ b/common/src/main/java/io/bitsquare/common/crypto/Encryption.java @@ -34,6 +34,7 @@ import java.security.*; import java.util.Arrays; // TODO: which counter modes and paddings should we use? +// TODO is Hmac needed/make sense? // https://security.stackexchange.com/questions/52665/which-is-the-best-cipher-mode-and-padding-mode-for-aes-encryption public class Encryption { private static final Logger log = LoggerFactory.getLogger(Encryption.class); diff --git a/core/src/main/java/io/bitsquare/btc/AddressBasedCoinSelector.java b/core/src/main/java/io/bitsquare/btc/AddressBasedCoinSelector.java index 0612d25cb6..30f49ec79e 100644 --- a/core/src/main/java/io/bitsquare/btc/AddressBasedCoinSelector.java +++ b/core/src/main/java/io/bitsquare/btc/AddressBasedCoinSelector.java @@ -114,7 +114,6 @@ class AddressBasedCoinSelector implements CoinSelector { log.trace("value needed: " + targetAsLong); HashSet selected = new HashSet<>(); // Sort the inputs by age*value so we get the highest "coindays" spent. - // TODO: Consider changing the wallets internal format to track just outputs and keep them ordered. ArrayList sortedOutputs = new ArrayList<>(candidates); // When calculating the wallet balance, we may be asked to select all possible coins, if so, avoid sorting // them in order to improve performance. diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java index 84c7bd6d81..a935b6c023 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java @@ -251,7 +251,7 @@ public class BitsquareApp extends Application { } } - //TODO just temp. + // Used for debugging trade process private void showDebugWindow() { ViewLoader viewLoader = injector.getInstance(ViewLoader.class); View debugView = viewLoader.load(DebugView.class); diff --git a/gui/src/main/java/io/bitsquare/gui/components/NetworkSyncPane.java b/gui/src/main/java/io/bitsquare/gui/components/NetworkSyncPane.java deleted file mode 100644 index b71db4ba8d..0000000000 --- a/gui/src/main/java/io/bitsquare/gui/components/NetworkSyncPane.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * This file is part of Bitsquare. - * - * Bitsquare is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bitsquare is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bitsquare. If not, see . - */ - -package io.bitsquare.gui.components; - -import javafx.animation.FadeTransition; -import javafx.animation.Interpolator; -import javafx.scene.control.Label; -import javafx.scene.control.ProgressBar; -import javafx.scene.layout.HBox; -import javafx.scene.layout.Pane; -import javafx.util.Duration; - -// TODO replace with new notification component from lighthouse/bitcoinJ - -public class NetworkSyncPane extends HBox { - - private final ProgressBar networkSyncProgressBar; - - private final Label networkSyncInfoLabel; - - public NetworkSyncPane() { - networkSyncInfoLabel = new Label(); - networkSyncInfoLabel.setText("Synchronize with network..."); - networkSyncProgressBar = new ProgressBar(); - networkSyncProgressBar.setPrefWidth(200); - networkSyncProgressBar.setProgress(-1); - - getChildren().addAll(new HSpacer(5), networkSyncProgressBar, networkSyncInfoLabel); - } - - public void setProgress(double percent) { - networkSyncProgressBar.setProgress(percent / 100.0); - networkSyncInfoLabel.setText("Synchronize with network: " + (int) percent + "%"); - } - - public void downloadComplete() { - networkSyncInfoLabel.setText("Sync with network: Done"); - networkSyncProgressBar.setProgress(1); - - FadeTransition fade = new FadeTransition(Duration.millis(700), this); - fade.setToValue(0.0); - fade.setCycleCount(1); - fade.setInterpolator(Interpolator.EASE_BOTH); - fade.play(); - fade.setOnFinished(e -> getChildren().clear()); - } -} - -class HSpacer extends Pane { - public HSpacer(double width) { - setPrefWidth(width); - } - - @Override - protected double computePrefWidth(double width) { - return getPrefWidth(); - } -} - diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index e3d56795ff..9e92f3404f 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -270,7 +270,7 @@ public class MainViewModel implements ViewModel { "Maybe you lost your internet connection or your computer was in hibernate/sleep mode."); else walletServiceErrorMsg.set(null); - }, 2); + }, 5); } else if ((int) oldValue == 0 && (int) newValue > 0) { walletServiceErrorMsg.set(null); } @@ -399,7 +399,7 @@ public class MainViewModel implements ViewModel { p2PNetworkWarnMsg.set(null); p2PNetworkLabelId.set("footer-pane"); } - }, 2); + }, 5); } else if ((int) oldValue == 0 && (int) newValue > 0) { p2PNetworkWarnMsg.set(null); p2PNetworkLabelId.set("footer-pane"); diff --git a/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/EnterPrivKeyPopup.java b/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/EnterPrivKeyPopup.java index 35d6c24319..e0b1a48cb1 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/EnterPrivKeyPopup.java +++ b/gui/src/main/java/io/bitsquare/gui/main/account/arbitratorregistration/EnterPrivKeyPopup.java @@ -90,7 +90,6 @@ public class EnterPrivKeyPopup extends Popup { GridPane.setRowIndex(label, ++rowIndex); keyInputTextField = new InputTextField(); - //TODO change when testing is done if (BitsquareApp.DEV_MODE) keyInputTextField.setText("6ac43ea1df2a290c1c8391736aa42e4339c5cb4f110ff0257a13b63211977b7a"); GridPane.setMargin(keyInputTextField, new Insets(3, 0, 0, 0)); diff --git a/gui/src/main/java/io/bitsquare/gui/main/funds/transactions/TransactionsView.java b/gui/src/main/java/io/bitsquare/gui/main/funds/transactions/TransactionsView.java index 10757897d3..e29eabbdf6 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/funds/transactions/TransactionsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/funds/transactions/TransactionsView.java @@ -128,9 +128,6 @@ public class TransactionsView extends ActivatableView { } private void openTxDetails(TransactionsListItem item) { - // TODO Open popup with details view - log.debug("openTxDetails " + item); - if (!item.isNotAnAddress()) { try { Utilities.openWebPage(preferences.getBlockChainExplorer().addressUrl + item.getAddressString()); diff --git a/gui/src/main/java/io/bitsquare/gui/main/funds/withdrawal/WithdrawalView.java b/gui/src/main/java/io/bitsquare/gui/main/funds/withdrawal/WithdrawalView.java index dc4f2b9969..6e7a9379d9 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/funds/withdrawal/WithdrawalView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/funds/withdrawal/WithdrawalView.java @@ -126,9 +126,6 @@ public class WithdrawalView extends ActivatableView { } private void openTxDetails(WithdrawalListItem item) { - // TODO Open popup with details view - log.debug("openTxDetails " + item); - try { Utilities.openWebPage(preferences.getBlockChainExplorer().addressUrl + item.getAddressString()); } catch (Exception e) { @@ -241,8 +238,7 @@ public class WithdrawalView extends ActivatableView { withdrawFromTextField.setPromptText("Select a source address from the table"); amountTextField.setText(""); amountTextField.setPromptText(""); - if (!BitsquareApp.DEV_MODE) - withdrawToTextField.setText(""); + withdrawToTextField.setText(""); withdrawToTextField.setPromptText(""); } diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java index 19ba3005fd..b441f077f5 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferView.java @@ -67,13 +67,9 @@ import static javafx.beans.binding.Bindings.createStringBinding; // priceAmountHBox is too large after redesign as to be used as layoutReference. @FxmlView public class TakeOfferView extends ActivatableViewAndModel { - - // TODO convert unneeded properties to static fields - private final Navigation navigation; private final BSFormatter formatter; private final OfferDetailsPopup offerDetailsPopup; - private ScrollPane scrollPane; private GridPane gridPane; private ImageView imageView; @@ -89,11 +85,8 @@ public class TakeOfferView extends ActivatableViewAndModel amountFocusedListener; - private int gridRow = 0; private ComboBox paymentAccountsComboBox; private Label paymentAccountsLabel; diff --git a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java index 0a25ea5775..f1bbf9cb6d 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/offer/takeoffer/TakeOfferViewModel.java @@ -26,6 +26,9 @@ import io.bitsquare.gui.util.validation.BtcValidator; import io.bitsquare.gui.util.validation.InputValidator; import io.bitsquare.locale.BSResources; import io.bitsquare.locale.TradeCurrency; +import io.bitsquare.p2p.P2PService; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.payment.PaymentAccount; import io.bitsquare.payment.PaymentMethod; import io.bitsquare.trade.Trade; @@ -44,10 +47,9 @@ import static javafx.beans.binding.Bindings.createStringBinding; class TakeOfferViewModel extends ActivatableWithDataModel implements ViewModel { private final BtcValidator btcValidator; + private P2PService p2PService; private final BSFormatter formatter; - // static fields - private String amountRange; private String addressAsString; private String paymentLabel; @@ -58,8 +60,6 @@ class TakeOfferViewModel extends ActivatableWithDataModel im private String directionLabel; private String amountDescription; - // TODO convert unneeded properties to static fields - // dynamic fields final StringProperty amount = new SimpleStringProperty(); final StringProperty volume = new SimpleStringProperty(); final StringProperty volumeDescriptionLabel = new SimpleStringProperty(); @@ -87,6 +87,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel im private ChangeListener tradeErrorListener; private ChangeListener offerStateListener; private ChangeListener offerErrorListener; + private ConnectionListener connectionListener; /////////////////////////////////////////////////////////////////////////////////////////// @@ -94,11 +95,12 @@ class TakeOfferViewModel extends ActivatableWithDataModel im /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public TakeOfferViewModel(TakeOfferDataModel dataModel, BtcValidator btcValidator, + public TakeOfferViewModel(TakeOfferDataModel dataModel, BtcValidator btcValidator, P2PService p2PService, BSFormatter formatter) { super(dataModel); this.btcValidator = btcValidator; + this.p2PService = p2PService; this.formatter = formatter; createListeners(); @@ -390,6 +392,22 @@ class TakeOfferViewModel extends ActivatableWithDataModel im tradeStateListener = (ov, oldValue, newValue) -> applyTradeState(newValue); tradeErrorListener = (ov, oldValue, newValue) -> applyTradeErrorMessage(newValue); offerStateListener = (ov, oldValue, newValue) -> applyOfferState(newValue); + connectionListener = new ConnectionListener() { + @Override + public void onDisconnect(Reason reason, Connection connection) { + if (connection.getPeersNodeAddressOptional().isPresent() && + connection.getPeersNodeAddressOptional().get().equals(offer.getOffererNodeAddress())) + offerWarning.set("You cannot take that offer because the offerer went offline."); + } + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onError(Throwable throwable) { + } + }; } private void addListeners() { @@ -401,7 +419,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel im dataModel.amountAsCoin.addListener(amountAsCoinListener); dataModel.isWalletFunded.addListener(isWalletFundedListener); - + p2PService.getNetworkNode().addConnectionListener(connectionListener); } private void removeListeners() { @@ -420,6 +438,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel im trade.stateProperty().removeListener(tradeStateListener); trade.errorMessageProperty().removeListener(tradeErrorListener); } + p2PService.getNetworkNode().removeConnectionListener(connectionListener); } diff --git a/gui/src/main/java/io/bitsquare/gui/util/BSFormatter.java b/gui/src/main/java/io/bitsquare/gui/util/BSFormatter.java index 84a199cab3..52296d8dc2 100644 --- a/gui/src/main/java/io/bitsquare/gui/util/BSFormatter.java +++ b/gui/src/main/java/io/bitsquare/gui/util/BSFormatter.java @@ -38,14 +38,6 @@ import java.util.List; import java.util.Locale; import java.util.stream.Collectors; -//TODO convert to non static - -/** - * Central point for formatting and input parsing. - *

- * Note that we never use for text input values any coin or currency symbol or code. - * BtcFormat does not support - */ public class BSFormatter { private static final Logger log = LoggerFactory.getLogger(BSFormatter.class); diff --git a/network/src/main/java/io/bitsquare/crypto/SealedAndSignedMessage.java b/network/src/main/java/io/bitsquare/crypto/DirectMessage.java similarity index 73% rename from network/src/main/java/io/bitsquare/crypto/SealedAndSignedMessage.java rename to network/src/main/java/io/bitsquare/crypto/DirectMessage.java index 6d86d73526..12f6ce875c 100644 --- a/network/src/main/java/io/bitsquare/crypto/SealedAndSignedMessage.java +++ b/network/src/main/java/io/bitsquare/crypto/DirectMessage.java @@ -4,25 +4,28 @@ import io.bitsquare.app.Version; import io.bitsquare.common.crypto.SealedAndSigned; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.messaging.MailboxMessage; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; import java.util.Arrays; -public final class SealedAndSignedMessage implements MailboxMessage { +public final class DirectMessage implements MailboxMessage, SendersNodeAddressMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.getNetworkId(); + private final NodeAddress senderNodeAddress; public final SealedAndSigned sealedAndSigned; public final byte[] addressPrefixHash; - public SealedAndSignedMessage(SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) { + public DirectMessage(NodeAddress senderNodeAddress, SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) { + this.senderNodeAddress = senderNodeAddress; this.sealedAndSigned = sealedAndSigned; this.addressPrefixHash = addressPrefixHash; } @Override public NodeAddress getSenderNodeAddress() { - return null; + return senderNodeAddress; } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 0932ecb3e9..884331deff 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -12,8 +12,8 @@ import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.PubKeyRing; +import io.bitsquare.crypto.DirectMessage; import io.bitsquare.crypto.EncryptionService; -import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.peers.Broadcaster; @@ -74,7 +74,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private volatile boolean shutDownInProgress; private boolean shutDownComplete; - private ChangeListener stateChangeListener; + private ChangeListener connectionNodeAddressListener; private Subscription networkReadySubscription; @@ -99,19 +99,19 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService); optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing); - init(useLocalhost, networkId, storageDir); } protected void init(boolean useLocalhost, int networkId, File storageDir) { Log.traceCall(); - stateChangeListener = (observable, oldValue, newValue) -> { - Set nodeAddressesOfAllSucceededConnections = networkNode.getNodeAddressesOfSucceededConnections(); - Set allSucceededConnections = networkNode.getSucceededConnections(); - log.info("List of peers (duplicates possible of inbound/outbound with same node address)\n" + nodeAddressesOfAllSucceededConnections); - log.info("Nr of connections: {} / {}", nodeAddressesOfAllSucceededConnections.size(), allSucceededConnections.size()); - UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfAllSucceededConnections.size())); + connectionNodeAddressListener = (observable, oldValue, newValue) -> { + Set nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections(); + Set allConfirmedConnections = networkNode.getConfirmedConnections(); + log.info("nodeAddressesOfConfirmedConnections=" + nodeAddressesOfConfirmedConnections); + log.info("allConfirmedConnections=" + allConfirmedConnections); + log.info("Nr of connections: {} / {} (nodeAddressesOfConfirmedConnections / allConfirmedConnections)", nodeAddressesOfConfirmedConnections.size(), allConfirmedConnections.size()); + UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfConfirmedConnections.size())); }; networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir); @@ -126,8 +126,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Set seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId); peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir); - requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses); - requestDataManager.setRequestDataManagerListener(this); + requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this); peerExchangeManager = new PeerExchangeManager(networkNode, peerManager, seedNodeAddresses); @@ -211,7 +210,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onTorNodeReady() { Log.traceCall(); - // 1 + requestDataManager.requestPreliminaryData(); p2pServiceListeners.stream().forEach(SetupListener::onTorNodeReady); } @@ -219,7 +218,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onHiddenServicePublished() { Log.traceCall(); - // 3 + checkArgument(networkNode.getNodeAddress() != null, "Address must be set when we have the hidden service ready"); hiddenServicePublished.set(true); @@ -239,14 +238,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis Log.traceCall(); networkReadySubscription.unsubscribe(); - Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getSeedNodeOfPreliminaryDataRequest(); + Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest(); checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); - // 4 - requestDataManager.updateDataFromConnectedSeedNode(); - // 5 - peerExchangeManager.getReportedPeersFromFirstSeedNode(seedNodeOfPreliminaryDataRequest.get()); + requestDataManager.requestUpdatesData(); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -256,13 +252,17 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onPreliminaryDataReceived() { checkArgument(!preliminaryDataReceived.get(), "preliminaryDataReceived was already set before."); - // 2 + preliminaryDataReceived.set(true); } @Override - public void onDataUpdate() { - // Result from requestDataManager.updateDataFromConnectedSeedNode(); + public void onUpdatedDataReceived() { + Optional seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest(); + checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), + "seedNodeOfPreliminaryDataRequest must be present"); + peerExchangeManager.requestReportedPeers(seedNodeOfPreliminaryDataRequest.get()); + p2pServiceListeners.stream().forEach(P2PServiceListener::onBootstrapped); } @@ -288,13 +288,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onConnection(Connection connection) { - connection.getStateProperty().addListener(stateChangeListener); + if (connection.getPeersNodeAddressOptional().isPresent()) { + connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null, + connection.getNodeAddressProperty().get()); + } else { + connection.getNodeAddressProperty().addListener(connectionNodeAddressListener); + } } @Override public void onDisconnect(Reason reason, Connection connection) { - connection.getStateProperty().removeListener(stateChangeListener); - UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getNodeAddressesOfSucceededConnections().size()), 1); + Log.traceCall(); + connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); + // We removed the listener after a delay to be sure the connection has been removed + // from the networkNode already. + UserThread.runAfter(() -> + connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null, + connection.getNodeAddressProperty().get()) + , 1); } @Override @@ -308,19 +319,19 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis @Override public void onMessage(Message message, Connection connection) { - if (message instanceof SealedAndSignedMessage) { + if (message instanceof DirectMessage) { Log.traceCall(message.toString()); // Seed nodes don't have set the encryptionService if (optionalEncryptionService.isPresent()) { try { - SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; - if (verifyAddressPrefixHash(sealedAndSignedMessage)) { - DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify( - sealedAndSignedMessage.sealedAndSigned); - + DirectMessage directMessage = (DirectMessage) message; + if (verifyAddressPrefixHash(directMessage)) { // We set connectionType to that connection to avoid that is get closed when // we get too many connection attempts. - connection.setPeerType(Connection.PeerType.PEER.DIRECT_MSG_PEER); + connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER); + + DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify( + directMessage.sealedAndSigned); log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + "Decrypted SealedAndSignedMessage:\ndecryptedMsgWithPubKey={}" @@ -370,27 +381,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } } - private void doSendEncryptedMailMessage(NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, MailMessage message, + private void doSendEncryptedMailMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) { Log.traceCall(); + checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at doSendEncryptedMailMessage"); checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen."); + checkNotNull(networkNode.getNodeAddress(), "networkNode.getNodeAddress() must not be null."); try { log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + "Encrypt message:\nmessage={}" + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message); - SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - optionalEncryptionService.get().encryptAndSign(pubKeyRing, message), peersNodeAddress.getAddressPrefixHash()); - SettableFuture future = networkNode.sendMessage(peersNodeAddress, sealedAndSignedMessage); + DirectMessage directMessage = new DirectMessage(networkNode.getNodeAddress(), + optionalEncryptionService.get().encryptAndSign(pubKeyRing, message), + peersNodeAddress.getAddressPrefixHash()); + SettableFuture future = networkNode.sendMessage(peersNodeAddress, directMessage); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { sendMailMessageListener.onArrived(); - if (connection != null) { - if (!connection.getPeersNodeAddressOptional().isPresent() && peersNodeAddress != null) - connection.setPeersNodeAddress(peersNodeAddress); - - connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER); - } } @Override @@ -417,11 +425,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis ExpirablePayload expirablePayload = mailboxData.expirablePayload; if (expirablePayload instanceof ExpirableMailboxPayload) { ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) expirablePayload; - SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage; - if (verifyAddressPrefixHash(sealedAndSignedMessage)) { + DirectMessage directMessage = expirableMailboxPayload.directMessage; + if (verifyAddressPrefixHash(directMessage)) { try { DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify( - sealedAndSignedMessage.sealedAndSigned); + directMessage.sealedAndSigned); if (decryptedMsgWithPubKey.message instanceof MailboxMessage) { MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message; NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress(); @@ -465,27 +473,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private void trySendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing, MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { Log.traceCall(); + checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at trySendEncryptedMailboxMessage"); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen."); + checkNotNull(networkNode.getNodeAddress(), "networkNode.getNodeAddress() must not be null."); try { log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + "Encrypt message:\nmessage={}" + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message); - SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( - optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message), peersNodeAddress.getAddressPrefixHash()); - SettableFuture future = networkNode.sendMessage(peersNodeAddress, sealedAndSignedMessage); + DirectMessage directMessage = new DirectMessage( + networkNode.getNodeAddress(), + optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message), + peersNodeAddress.getAddressPrefixHash()); + SettableFuture future = networkNode.sendMessage(peersNodeAddress, directMessage); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { log.trace("SendEncryptedMailboxMessage onSuccess"); sendMailboxMessageListener.onArrived(); - - if (connection != null) { - if (!connection.getPeersNodeAddressOptional().isPresent() && peersNodeAddress != null) - connection.setPeersNodeAddress(peersNodeAddress); - - connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER); - } } @Override @@ -495,7 +500,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox."); log.trace("create MailboxEntry with peerAddress " + peersNodeAddress); PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey(); - addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage, + addMailboxData(new ExpirableMailboxPayload(directMessage, optionalKeyRing.get().getSignatureKeyPair().getPublic(), receiverStoragePublicKey), receiverStoragePublicKey); @@ -664,7 +669,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } public Set getNodeAddressesOfConnectedPeers() { - return networkNode.getNodeAddressesOfSucceededConnections(); + return networkNode.getNodeAddressesOfConfirmedConnections(); } public ReadOnlyIntegerProperty getNumConnectedPeers() { @@ -680,11 +685,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // Private /////////////////////////////////////////////////////////////////////////////////////////// - private boolean verifyAddressPrefixHash(SealedAndSignedMessage sealedAndSignedMessage) { + private boolean verifyAddressPrefixHash(DirectMessage directMessage) { if (networkNode.getNodeAddress() != null) { byte[] blurredAddressHash = networkNode.getNodeAddress().getAddressPrefixHash(); return blurredAddressHash != null && - Arrays.equals(blurredAddressHash, sealedAndSignedMessage.addressPrefixHash); + Arrays.equals(blurredAddressHash, directMessage.addressPrefixHash); } else { log.debug("myOnionAddress is null at verifyAddressPrefixHash. That is expected at startup."); return false; diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 67d12dba59..5b26977da9 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -6,10 +6,12 @@ import io.bitsquare.app.Log; import io.bitsquare.app.Version; import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.common.UserThread; +import io.bitsquare.crypto.DirectMessage; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.Utils; import io.bitsquare.p2p.network.messages.CloseConnectionMessage; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; import javafx.beans.property.ObjectProperty; import javafx.beans.property.ReadOnlyObjectProperty; import javafx.beans.property.SimpleObjectProperty; @@ -26,12 +28,12 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.*; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** * Connection is created by the server thread or by sendMessage from NetworkNode. * All handlers are called on User thread. - * Shared data between InputHandler thread and that */ public class Connection implements MessageListener { private static final Logger log = LoggerFactory.getLogger(Connection.class); @@ -47,17 +49,6 @@ public class Connection implements MessageListener { // Enums /////////////////////////////////////////////////////////////////////////////////////////// - public enum Direction { - OUTBOUND, - INBOUND - } - - public enum State { - IDLE, - SUCCEEDED, - FAILED - } - public enum PeerType { SEED_NODE, PEER, @@ -72,12 +63,11 @@ public class Connection implements MessageListener { private final Socket socket; private final MessageListener messageListener; private final ConnectionListener connectionListener; - private final Direction direction; private final String portInfo; private final String uid = UUID.randomUUID().toString(); private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // holder of state shared between InputHandler and Connection - private final SharedSpace sharedSpace; + private final SharedModel sharedModel; // set in init private InputHandler inputHandler; @@ -91,35 +81,32 @@ public class Connection implements MessageListener { // java.util.zip.DataFormatException: invalid literal/lengths set // use GZIPInputStream but problems with blocking private final boolean useCompression = false; - - - private final ObjectProperty stateProperty = new SimpleObjectProperty<>(); private PeerType peerType; + private ObjectProperty nodeAddressProperty = new SimpleObjectProperty<>(); /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, Direction direction) { + public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, + @Nullable NodeAddress peersNodeAddress) { + Log.traceCall(); this.socket = socket; this.messageListener = messageListener; this.connectionListener = connectionListener; - this.direction = direction; - stateProperty.set(State.IDLE); - sharedSpace = new SharedSpace(this, socket); + sharedModel = new SharedModel(this, socket); - Log.traceCall(); if (socket.getLocalPort() == 0) portInfo = "port=" + socket.getPort(); else portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort(); - init(); + init(peersNodeAddress); } - private void init() { + private void init(@Nullable NodeAddress peersNodeAddress) { Log.traceCall(); try { @@ -134,15 +121,22 @@ public class Connection implements MessageListener { // We create a thread for handling inputStream data - inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo, this, useCompression); + inputHandler = new InputHandler(sharedModel, objectInputStream, portInfo, this, useCompression); singleThreadExecutor.submit(inputHandler); } catch (IOException e) { - sharedSpace.handleConnectionException(e); + sharedModel.handleConnectionException(e); } - sharedSpace.updateLastActivityDate(); + sharedModel.updateLastActivityDate(); + + // Use Peer as default, in case of other types they will set it as soon as possible. + peerType = PeerType.PEER; + + if (peersNodeAddress != null) + setPeersNodeAddress(peersNodeAddress); log.trace("\nNew connection created " + this.toString()); + UserThread.execute(() -> connectionListener.onConnection(this)); } @@ -158,9 +152,20 @@ public class Connection implements MessageListener { if (!stopped) { try { String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null"; - log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + - "Write object to outputStream to peer: {} (uid={})\nmessage={}" - + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", peersNodeAddress, uid, message); + if (message instanceof DirectMessage && peersNodeAddressOptional.isPresent()) { + setPeerType(Connection.PeerType.DIRECT_MSG_PEER); + + log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + + "Sending direct message to peer" + + "Write object to outputStream to peer: {} (uid={})\nmessage={}" + + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", + peersNodeAddress, uid, message); + } else { + log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + + "Write object to outputStream to peer: {} (uid={})\nmessage={}" + + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", + peersNodeAddress, uid, message); + } Object objectToWrite; if (useCompression) { @@ -178,11 +183,11 @@ public class Connection implements MessageListener { objectOutputStream.writeObject(objectToWrite); objectOutputStream.flush(); } - sharedSpace.updateLastActivityDate(); + sharedModel.updateLastActivityDate(); } } catch (IOException e) { // an exception lead to a shutdown - sharedSpace.handleConnectionException(e); + sharedModel.handleConnectionException(e); } } else { log.debug("called sendMessage but was already stopped"); @@ -191,7 +196,7 @@ public class Connection implements MessageListener { public void reportIllegalRequest(IllegalRequest illegalRequest) { Log.traceCall(); - sharedSpace.reportIllegalRequest(illegalRequest); + sharedModel.reportIllegalRequest(illegalRequest); } @@ -216,25 +221,21 @@ public class Connection implements MessageListener { this.peerType = peerType; } - public void setState(State state) { - Log.traceCall(state.toString()); + public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) { + Log.traceCall(peerNodeAddress.toString()); + checkNotNull(peerNodeAddress, "peerAddress must not be null"); + peersNodeAddressOptional = Optional.of(peerNodeAddress); - if (state == State.SUCCEEDED) { - String peersNodeAddress = getPeersNodeAddressOptional().isPresent() ? getPeersNodeAddressOptional().get().getFullAddress() : ""; + String peersNodeAddress = getPeersNodeAddressOptional().isPresent() ? getPeersNodeAddressOptional().get().getFullAddress() : ""; + if (this instanceof InboundConnection) { log.info("\n\n############################################################\n" + - "We are successfully connected to:\n" + - "peerAddress= " + peersNodeAddress + + "We got the peers node address set.\n" + + "peersNodeAddress= " + peersNodeAddress + "\nuid=" + getUid() + "\n############################################################\n"); } - this.stateProperty.set(state); - } - - public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) { - Log.traceCall(); - checkNotNull(peerNodeAddress, "peerAddress must not be null"); - peersNodeAddressOptional = Optional.of(peerNodeAddress); + nodeAddressProperty.set(peerNodeAddress); } @@ -247,13 +248,17 @@ public class Connection implements MessageListener { } public Date getLastActivityDate() { - return sharedSpace.getLastActivityDate(); + return sharedModel.getLastActivityDate(); } public String getUid() { return uid; } + public boolean hasPeersNodeAddress() { + return peersNodeAddressOptional.isPresent(); + } + public boolean isStopped() { return stopped; } @@ -262,16 +267,8 @@ public class Connection implements MessageListener { return peerType; } - public Direction getDirection() { - return direction; - } - - public ReadOnlyObjectProperty getStateProperty() { - return stateProperty; - } - - public State getState() { - return stateProperty.get(); + public ReadOnlyObjectProperty getNodeAddressProperty() { + return nodeAddressProperty; } @@ -298,8 +295,8 @@ public class Connection implements MessageListener { log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "ShutDown connection:" + "\npeersNodeAddress=" + peersNodeAddress - + "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort() - + "/" + sharedSpace.getSocket().getPort() + + "\nlocalPort/port=" + sharedModel.getSocket().getLocalPort() + + "/" + sharedModel.getSocket().getPort() + "\nuid=" + uid + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); @@ -330,14 +327,14 @@ public class Connection implements MessageListener { private void setStopFlags() { stopped = true; - sharedSpace.stop(); + sharedModel.stop(); if (inputHandler != null) inputHandler.stop(); } private void doShutDown(@Nullable Runnable shutDownCompleteHandler) { Log.traceCall(); - ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason(); + ConnectionListener.Reason shutDownReason = sharedModel.getShutDownReason(); if (shutDownReason == null) shutDownReason = ConnectionListener.Reason.SHUT_DOWN; final ConnectionListener.Reason finalShutDownReason = shutDownReason; @@ -345,7 +342,7 @@ public class Connection implements MessageListener { UserThread.execute(() -> connectionListener.onDisconnect(finalShutDownReason, this)); try { - sharedSpace.getSocket().close(); + sharedModel.getSocket().close(); } catch (SocketException e) { log.trace("SocketException at shutdown might be expected " + e.getMessage()); } catch (IOException e) { @@ -389,11 +386,17 @@ public class Connection implements MessageListener { return "Connection{" + "peerAddress=" + peersNodeAddressOptional + ", peerType=" + peerType + - ", direction=" + direction + - ", state=" + getState() + + ", uid='" + uid + '\'' + + '}'; + } + + public String printDetails() { + return "Connection{" + + "peerAddress=" + peersNodeAddressOptional + + ", peerType=" + peerType + ", portInfo=" + portInfo + ", uid='" + uid + '\'' + - ", sharedSpace=" + sharedSpace.toString() + + ", sharedSpace=" + sharedModel.toString() + ", stopped=" + stopped + ", useCompression=" + useCompression + '}'; @@ -408,8 +411,8 @@ public class Connection implements MessageListener { * Holds all shared data between Connection and InputHandler * Runs in same thread as Connection */ - private static class SharedSpace { - private static final Logger log = LoggerFactory.getLogger(SharedSpace.class); + private static class SharedModel { + private static final Logger log = LoggerFactory.getLogger(SharedModel.class); private final Connection connection; private final Socket socket; @@ -420,7 +423,7 @@ public class Connection implements MessageListener { private volatile boolean stopped; private ConnectionListener.Reason shutDownReason; - public SharedSpace(Connection connection, Socket socket) { + public SharedModel(Connection connection, Socket socket) { Log.traceCall(); this.connection = connection; this.socket = socket; @@ -531,7 +534,7 @@ public class Connection implements MessageListener { private static class InputHandler implements Runnable { private static final Logger log = LoggerFactory.getLogger(InputHandler.class); - private final SharedSpace sharedSpace; + private final SharedModel sharedModel; private final ObjectInputStream objectInputStream; private final String portInfo; private final MessageListener messageListener; @@ -539,10 +542,10 @@ public class Connection implements MessageListener { private volatile boolean stopped; - public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) { - this.useCompression = useCompression; + public InputHandler(SharedModel sharedModel, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) { Log.traceCall(); - this.sharedSpace = sharedSpace; + this.useCompression = useCompression; + this.sharedModel = sharedModel; this.objectInputStream = objectInputStream; this.portInfo = portInfo; this.messageListener = messageListener; @@ -560,9 +563,9 @@ public class Connection implements MessageListener { Thread.currentThread().setName("InputHandler-" + portInfo); while (!stopped && !Thread.currentThread().isInterrupted()) { try { - log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionInfo()); + log.trace("InputHandler waiting for incoming messages connection=" + sharedModel.getConnectionInfo()); Object rawInputObject = objectInputStream.readObject(); - log.trace("New data arrived at inputHandler.Connection=" + sharedSpace.getConnectionInfo()); + log.trace("New data arrived at inputHandler.Connection=" + sharedModel.getConnectionInfo()); log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + "New data arrived at inputHandler.\nReceived object={}" @@ -570,7 +573,7 @@ public class Connection implements MessageListener { int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; if (size > getMaxMsgSize()) { - sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); + sharedModel.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); return; } @@ -582,57 +585,83 @@ public class Connection implements MessageListener { //log.trace("Read object compressed data size: " + size); serializable = Utils.decompress(compressedObjectAsBytes); } else { - sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType); } } else { if (rawInputObject instanceof Serializable) { serializable = (Serializable) rawInputObject; } else { - sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType); } } //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); // compressed size might be bigger theoretically so we check again after decompression if (size > getMaxMsgSize()) { - sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); + sharedModel.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); return; } if (!(serializable instanceof Message)) { - sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType); return; } Message message = (Message) serializable; if (message.networkId() != Version.getNetworkId()) { - sharedSpace.reportIllegalRequest(IllegalRequest.WrongNetworkId); + sharedModel.reportIllegalRequest(IllegalRequest.WrongNetworkId); return; } - sharedSpace.updateLastActivityDate(); + sharedModel.updateLastActivityDate(); + Connection connection = sharedModel.connection; if (message instanceof CloseConnectionMessage) { - log.info("CloseConnectionMessage received on connection {}", sharedSpace.connection); + log.info("CloseConnectionMessage received on connection {}", connection); stopped = true; - sharedSpace.shutDown(false); + sharedModel.shutDown(false); } else if (!stopped) { - messageListener.onMessage(message, null); + // First a seed node gets a message form a peer (PreliminaryDataRequest using + // AnonymousMessage interface) which does not has its hidden service + // published, so does not know its address. As the IncomingConnection does not has the + // peersNodeAddress set that connection cannot be used for outgoing messages until we + // get the address set. + // At the data update message (DataRequest using SendersNodeAddressMessage interface) + // after the HS is published we get the peers address set. + + // There are only those messages used for new connections to a peer: + // 1. PreliminaryDataRequest + // 2. DataRequest (implements SendersNodeAddressMessage) + // 3. GetPeersRequest (implements SendersNodeAddressMessage) + // 4. DirectMessage (implements SendersNodeAddressMessage) + if (message instanceof SendersNodeAddressMessage) { + NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress(); + Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); + if (peersNodeAddressOptional.isPresent()) + checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress), + "senderNodeAddress not matching connections peer address"); + else + connection.setPeersNodeAddress(senderNodeAddress); + } + if (message instanceof DirectMessage) + connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER); + + messageListener.onMessage(message, connection); } } catch (IOException | ClassNotFoundException | NoClassDefFoundError e) { stopped = true; - sharedSpace.handleConnectionException(e); + sharedModel.handleConnectionException(e); } } } catch (Throwable t) { t.printStackTrace(); stopped = true; - sharedSpace.handleConnectionException(new Exception(t)); + sharedModel.handleConnectionException(new Exception(t)); } } @Override public String toString() { return "InputHandler{" + - "sharedSpace=" + sharedSpace + + "sharedSpace=" + sharedModel + ", port=" + portInfo + ", stopped=" + stopped + '}'; diff --git a/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java b/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java index 5a1ef96718..2f5acade23 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java @@ -1,7 +1,6 @@ package io.bitsquare.p2p.network; public enum IllegalRequest { - // TODO check for needed allowed tolerance MaxSizeExceeded(1), InvalidDataType(0), WrongNetworkId(0); diff --git a/network/src/main/java/io/bitsquare/p2p/network/InboundConnection.java b/network/src/main/java/io/bitsquare/p2p/network/InboundConnection.java new file mode 100644 index 0000000000..c707739f00 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/network/InboundConnection.java @@ -0,0 +1,9 @@ +package io.bitsquare.p2p.network; + +import java.net.Socket; + +public class InboundConnection extends Connection { + public InboundConnection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + super(socket, messageListener, connectionListener, null); + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 10ed6ee03d..b8c2435dda 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -93,64 +93,42 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener "We will create a new outbound connection.", peersNodeAddress); final SettableFuture resultFuture = SettableFuture.create(); - final boolean[] timeoutOccurred = new boolean[1]; - timeoutOccurred[0] = false; - ListenableFuture future = executorService.submit(() -> { Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress); - Connection newConnection = null; + OutboundConnection outboundConnection = null; try { // can take a while when using tor Socket socket = createSocket(peersNodeAddress); - if (timeoutOccurred[0]) - throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peersNodeAddress); - - newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this, Connection.Direction.OUTBOUND); - newConnection.setPeersNodeAddress(peersNodeAddress); - outBoundConnections.add(newConnection); + outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress); + outBoundConnections.add(outboundConnection); log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "NetworkNode created new outbound connection:" - + "\npeerAddress=" + peersNodeAddress - + "\nuid=" + newConnection.getUid() + + "\nmyNodeAddress=" + getNodeAddress() + + "\npeersNodeAddress=" + peersNodeAddress + + "\nuid=" + outboundConnection.getUid() + "\nmessage=" + message + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); // can take a while when using tor - newConnection.sendMessage(message); - return newConnection; + outboundConnection.sendMessage(message); + return outboundConnection; } catch (Throwable throwable) { if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { throwable.printStackTrace(); log.error("Executing task failed. " + throwable.getMessage()); } - if (newConnection != null) - newConnection.setState(Connection.State.FAILED); throw throwable; } }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { - UserThread.execute(() -> { - //timer.cancel(); - connection.setState(Connection.State.SUCCEEDED); - resultFuture.set(connection); - }); + UserThread.execute(() -> resultFuture.set(connection)); } public void onFailure(@NotNull Throwable throwable) { - UserThread.execute(() -> { - //timer.cancel(); - - if (lookupInboundConnection(peersNodeAddress).isPresent()) { - lookupInboundConnection(peersNodeAddress).get().setState(Connection.State.FAILED); - } else if (lookupOutboundConnection(peersNodeAddress).isPresent()) { - lookupOutboundConnection(peersNodeAddress).get().setState(Connection.State.FAILED); - } - - resultFuture.setException(throwable); - }); + UserThread.execute(() -> resultFuture.setException(throwable)); } }); @@ -169,12 +147,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { - connection.setState(Connection.State.SUCCEEDED); UserThread.execute(() -> resultFuture.set(connection)); } public void onFailure(@NotNull Throwable throwable) { - connection.setState(Connection.State.FAILED); UserThread.execute(() -> resultFuture.setException(throwable)); } }); @@ -189,18 +165,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener return set; } - public Set getSucceededConnections() { + public Set getConfirmedConnections() { // Can contain inbound and outbound connections with the same peer node address, // as connection hashcode is using uid and port info return getAllConnections().stream() - .filter(e -> e.getPeersNodeAddressOptional().isPresent()) - .filter(e -> e.getState().equals(Connection.State.SUCCEEDED)) + .filter(Connection::hasPeersNodeAddress) .collect(Collectors.toSet()); } - public Set getNodeAddressesOfSucceededConnections() { + public Set getNodeAddressesOfConfirmedConnections() { // Does not contain inbound and outbound connection with the same peer node address - return getSucceededConnections().stream() + return getConfirmedConnections().stream() .map(e -> e.getPeersNodeAddressOptional().get()) .collect(Collectors.toSet()); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/OutboundConnection.java b/network/src/main/java/io/bitsquare/p2p/network/OutboundConnection.java new file mode 100644 index 0000000000..25973ddf72 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/network/OutboundConnection.java @@ -0,0 +1,11 @@ +package io.bitsquare.p2p.network; + +import io.bitsquare.p2p.NodeAddress; + +import java.net.Socket; + +public class OutboundConnection extends Connection { + public OutboundConnection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, NodeAddress peersNodeAddress) { + super(socket, messageListener, connectionListener, peersNodeAddress); + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index a9802c709c..b5c43e9cc5 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -43,7 +43,7 @@ class Server implements Runnable { final Socket socket = serverSocket.accept(); if (!stopped && !Thread.currentThread().isInterrupted()) { log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort()); - Connection connection = new Connection(socket, messageListener, connectionListener, Connection.Direction.INBOUND); + InboundConnection connection = new InboundConnection(socket, messageListener, connectionListener); log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "Server created new inbound connection:" diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/AnonymousMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/AnonymousMessage.java new file mode 100644 index 0000000000..6b74d73d55 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/network/messages/AnonymousMessage.java @@ -0,0 +1,6 @@ +package io.bitsquare.p2p.network.messages; + +import io.bitsquare.p2p.Message; + +public interface AnonymousMessage extends Message { +} diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/SendersNodeAddressMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/SendersNodeAddressMessage.java new file mode 100644 index 0000000000..844d7f58f8 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/network/messages/SendersNodeAddressMessage.java @@ -0,0 +1,8 @@ +package io.bitsquare.p2p.network.messages; + +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.NodeAddress; + +public interface SendersNodeAddressMessage extends Message { + NodeAddress getSenderNodeAddress(); +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java index 1f83ec4c1c..00a0314b0f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java @@ -25,25 +25,19 @@ public class Broadcaster { public void broadcast(DataBroadcastMessage message, @Nullable NodeAddress sender) { Log.traceCall("Sender " + sender + ". Message " + message.toString()); - Set receivers = networkNode.getNodeAddressesOfSucceededConnections(); + Set receivers = networkNode.getConfirmedConnections(); if (!receivers.isEmpty()) { log.info("Broadcast message to {} peers. Message: {}", receivers.size(), message); receivers.stream() - .filter(e -> !e.equals(sender)) - .forEach(nodeAddress -> { - log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " + nodeAddress + "."); - SettableFuture future = networkNode.sendMessage(nodeAddress, message); + .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) + .forEach(connection -> { + log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " + + connection.getPeersNodeAddressOptional().get() + "."); + SettableFuture future = networkNode.sendMessage(connection, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + nodeAddress + " succeeded."); - if (connection != null) { - if (!connection.getPeersNodeAddressOptional().isPresent()) - connection.setPeersNodeAddress(nodeAddress); - - if (connection.getPeerType() == null) - connection.setPeerType(Connection.PeerType.PEER); - } + log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + connection + " succeeded."); } @Override diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index 65af9716a3..59ff6fd6d7 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -20,12 +20,14 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + public class PeerExchangeManager implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class); @@ -33,7 +35,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private final PeerManager peerManager; private final Set seedNodeAddresses; private final ScheduledThreadPoolExecutor executor; - private Timer requestReportedPeersTimer, checkForSeedNodesTimer; + private Timer requestReportedPeersAfterDelayTimer, timeoutTimer, checkForSeedNodesTimer; /////////////////////////////////////////////////////////////////////////////////////////// @@ -45,23 +47,31 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener this.peerManager = peerManager; this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); - networkNode.addMessageListener(this); - executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5); - long delay = new Random().nextInt(60) + 60 * 6; // 6-7 min. - executor.scheduleAtFixedRate(() -> UserThread.execute(() -> { - sendGetPeersRequestToAllConnectedPeers(); - checkSeedNodes(); - }), delay, delay, TimeUnit.SECONDS); + long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min. + executor.scheduleAtFixedRate(() -> UserThread.execute(this::checkForSeedNode), + delay, delay, TimeUnit.SECONDS); + + networkNode.addMessageListener(this); } public void shutDown() { Log.traceCall(); networkNode.removeMessageListener(this); - MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); - stopRequestReportedPeersTimer(); + stopRequestReportedPeersAfterDelayTimer(); stopCheckForSeedNodesTimer(); + stopTimeoutTimer(); + MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public void requestReportedPeers(NodeAddress nodeAddress) { + requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses)); } @@ -75,8 +85,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener @Override public void onDisconnect(Reason reason, Connection connection) { + // We use a timer to throttle if we get a series of disconnects + // The more connections we have the more relaxed we are with a checkForSeedNode if (checkForSeedNodesTimer == null) - checkForSeedNodesTimer = UserThread.runAfter(this::checkSeedNodes, + checkForSeedNodesTimer = UserThread.runAfter(this::checkForSeedNode, networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS); } @@ -94,64 +106,53 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener if (message instanceof PeerExchangeMessage) { Log.traceCall(message.toString()); if (message instanceof GetPeersRequest) { - GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message; - HashSet reportedPeers = getPeersRequestMessage.reportedPeers; - log.trace("Received peers: " + reportedPeers); - if (!connection.getPeersNodeAddressOptional().isPresent()) - connection.setPeersNodeAddress(getPeersRequestMessage.senderNodeAddress); - + HashSet reportedPeers = ((GetPeersRequest) message).reportedPeers; + log.trace("Received reported peers: " + reportedPeers); + checkArgument(connection.getPeersNodeAddressOptional().isPresent(), + "The peers address must have been already set at the moment"); SettableFuture future = networkNode.sendMessage(connection, - new GetPeersResponse(getReportedPeersHashSet(getPeersRequestMessage.senderNodeAddress))); + new GetPeersResponse(getReportedPeersHashSet(connection.getPeersNodeAddressOptional().get()))); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.trace("GetPeersResponse sent successfully"); - - handleOnSuccess(connection, null); } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersResponse sending failed " + throwable.getMessage()); + log.info("GetPeersResponse sending failed " + throwable.getMessage() + + " Maybe the peer went offline."); } }); peerManager.addToReportedPeers(reportedPeers, connection); } else if (message instanceof GetPeersResponse) { + stopTimeoutTimer(); HashSet reportedPeers = ((GetPeersResponse) message).reportedPeers; - log.trace("Received peers: " + reportedPeers); + log.trace("Received reported peers: " + reportedPeers); peerManager.addToReportedPeers(reportedPeers, connection); - if (!peerManager.hasSufficientConnections()) { - List remainingNodeAddresses = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers()); - networkNode.getNodeAddressesOfSucceededConnections().stream().forEach(remainingNodeAddresses::remove); - if (!remainingNodeAddresses.isEmpty()) { - NodeAddress nodeAddress = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nodeAddress); - requestPeersFromReportedPeers(nodeAddress, remainingNodeAddresses); - } - } + continueWithMorePeers(); } } } - /////////////////////////////////////////////////////////////////////////////////////////// - // API - /////////////////////////////////////////////////////////////////////////////////////////// - - public void getReportedPeersFromFirstSeedNode(NodeAddress nodeAddress) { - getReportedPeersFromSeedNode(nodeAddress, new ArrayList<>(seedNodeAddresses)); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void getReportedPeersFromSeedNode(NodeAddress nodeAddress, List remainingNodeAddresses) { - Log.traceCall("nodeAddress=" + nodeAddress); - stopRequestReportedPeersTimer(); - stopCheckForSeedNodesTimer(); + private void requestReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) { + Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at getReportedPeersFromSeedNode"); + + stopRequestReportedPeersAfterDelayTimer(); + stopTimeoutTimer(); + + timeoutTimer = UserThread.runAfter(() -> { + log.info("timeoutTimer called"); + handleError(nodeAddress, remainingNodeAddresses); + }, + 10, TimeUnit.SECONDS); SettableFuture future = networkNode.sendMessage(nodeAddress, new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress))); @@ -159,97 +160,99 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener @Override public void onSuccess(Connection connection) { log.trace("GetPeersRequest sent successfully"); - - handleOnSuccess(connection, nodeAddress); } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersRequest sending failed " + throwable.getMessage()); - - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are more seed nodes available for requesting peers. " + - "We will try getReportedPeersFromFirstSeedNode again."); - - NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nextCandidate); - getReportedPeersFromSeedNode(nextCandidate, remainingNodeAddresses); - } else { - log.info("There is no seed node available for requesting peers. " + - "That is expected if no seed node is online.\n" + - "We will try again to request peers from a seed node after a random pause."); - requestReportedPeersTimer = UserThread.runAfterRandomDelay(() -> { - if (!seedNodeAddresses.isEmpty()) { - List nodeAddresses = new ArrayList<>(seedNodeAddresses); - NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); - nodeAddresses.remove(nextCandidate); - getReportedPeersFromSeedNode(nextCandidate, nodeAddresses); - } - }, - 30, 40, TimeUnit.SECONDS); - } + log.info("Sending GetPeersRequest to " + nodeAddress + " failed. " + + "That is expected if the peer is offline. " + + "Exception:" + throwable.getMessage()); + handleError(nodeAddress, remainingNodeAddresses); } }); } - private void handleOnSuccess(Connection connection, @Nullable NodeAddress nodeAddress) { - if (connection != null) { - if (!connection.getPeersNodeAddressOptional().isPresent() && nodeAddress != null) - connection.setPeersNodeAddress(nodeAddress); - - if (connection.getPeerType() == null) - connection.setPeerType(peerManager.isSeedNode(connection) ? Connection.PeerType.SEED_NODE : Connection.PeerType.PEER); + private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { + Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + stopTimeoutTimer(); + //peerManager.removePeer(nodeAddress); + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting peers. " + + "We will try getReportedPeers again."); + requestReportedPeersFromList(remainingNodeAddresses); + } else { + log.info("There is no remaining node available for requesting peers. " + + "That is expected if no other node is online.\n" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request peers from our seed nodes after a random pause."); + requestReportedPeersAfterDelayTimer = UserThread.runAfter(() -> + continueWithMorePeers(), + 10, TimeUnit.SECONDS); } } - private void requestPeersFromReportedPeers(NodeAddress nodeAddress, List remainingNodeAddresses) { - Log.traceCall("nodeAddress=" + nodeAddress); - stopRequestReportedPeersTimer(); - - SettableFuture future = networkNode.sendMessage(nodeAddress, - new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("sendGetPeersRequest sent successfully"); - - handleOnSuccess(connection, nodeAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("sendGetPeersRequest sending failed " + throwable.getMessage()); - - peerManager.removePeer(nodeAddress); - - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are more reported peers available for requesting peers. " + - "We will try getReportedPeersFromFirstSeedNode again."); - - NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - remainingNodeAddresses.remove(nextCandidate); - requestPeersFromReportedPeers(nextCandidate, remainingNodeAddresses); - } else { - log.info("There are no more reported peers available for requesting peers. " + - "We will try again to request peers from the reported peers again after a random pause."); - requestReportedPeersTimer = UserThread.runAfterRandomDelay(() -> { - List nodeAddresses = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers()); - if (!nodeAddresses.isEmpty()) { - NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); - nodeAddresses.remove(nextCandidate); - requestPeersFromReportedPeers(nextCandidate, nodeAddresses); - } - }, - 30, 40, TimeUnit.SECONDS); - } - } - }); + private void requestReportedPeersFromList(List remainingNodeAddresses) { + NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); + remainingNodeAddresses.remove(nextCandidate); + requestReportedPeers(nextCandidate, remainingNodeAddresses); } - private void stopRequestReportedPeersTimer() { - if (requestReportedPeersTimer != null) { - requestReportedPeersTimer.cancel(); - requestReportedPeersTimer = null; + private void continueWithMorePeers() { + Log.traceCall(); + if (!peerManager.hasSufficientConnections()) { + // We want to keep it sorted but avoid duplicates + List list = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers().stream() + .filter(e -> !networkNode.getNodeAddressesOfConfirmedConnections().contains(e)) + .collect(Collectors.toSet())); + list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream() + .filter(e -> !list.contains(e) && + !networkNode.getNodeAddressesOfConfirmedConnections().contains(e)) + .collect(Collectors.toSet())); + list.addAll(seedNodeAddresses.stream() + .filter(e -> !list.contains(e) && + !networkNode.getNodeAddressesOfConfirmedConnections().contains(e) && + !e.equals(networkNode.getNodeAddress())) + .collect(Collectors.toSet())); + if (!list.isEmpty()) { + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestReportedPeers(nextCandidate, list); + } else { + log.info("No more peers are available for requestReportedPeers."); + } + } else { + log.info("We have already sufficient connections."); + } + } + + + // we check if we have at least one seed node connected + private void checkForSeedNode() { + Log.traceCall(); + stopCheckForSeedNodesTimer(); + Set allConnections = networkNode.getConfirmedConnections(); + List seedNodes = allConnections.stream() + .filter(peerManager::isSeedNode) + .collect(Collectors.toList()); + + if (seedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) { + requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses)); + } + } + + private HashSet getReportedPeersHashSet(NodeAddress receiverNodeAddress) { + return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream() + .filter(e -> !peerManager.isSeedNode(e) && + !e.nodeAddress.equals(networkNode.getNodeAddress()) && + !e.nodeAddress.equals(receiverNodeAddress) + ) + .collect(Collectors.toSet())); + } + + private void stopRequestReportedPeersAfterDelayTimer() { + if (requestReportedPeersAfterDelayTimer != null) { + requestReportedPeersAfterDelayTimer.cancel(); + requestReportedPeersAfterDelayTimer = null; } } @@ -260,58 +263,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } } - - private void sendGetPeersRequestToAllConnectedPeers() { - // copy set to avoid issues with changes in set (we dont need to be perfectly in sync so no need to use a concurrent set) - Set connectedPeers = new HashSet<>(networkNode.getNodeAddressesOfSucceededConnections()); - if (!connectedPeers.isEmpty()) { - Log.traceCall("connectedPeers.size=" + connectedPeers.size()); - connectedPeers.stream().forEach(nodeAddress -> - UserThread.runAfterRandomDelay(() -> - sendGetPeersRequest(nodeAddress), 3, 6)); - } - } - - private void sendGetPeersRequest(NodeAddress nodeAddress) { - Log.traceCall("nodeAddress=" + nodeAddress); - SettableFuture future = networkNode.sendMessage(nodeAddress, - new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("sendGetPeersRequest sent successfully"); - handleOnSuccess(connection, nodeAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("sendGetPeersRequest sending failed " + throwable.getMessage()); - peerManager.removePeer(nodeAddress); - } - }); - } - - private HashSet getReportedPeersHashSet(@Nullable NodeAddress receiverNodeAddress) { - return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream() - .filter(e -> !peerManager.isSeedNode(e) && - !e.nodeAddress.equals(networkNode.getNodeAddress()) && - !e.nodeAddress.equals(receiverNodeAddress) - ) - .collect(Collectors.toSet())); - } - - private void checkSeedNodes() { - Log.traceCall(); - Set allConnections = networkNode.getAllConnections(); - List seedNodes = allConnections.stream() - .filter(peerManager::isSeedNode) - .collect(Collectors.toList()); - - if (seedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) { - List nodeAddresses = new ArrayList<>(seedNodeAddresses); - NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); - nodeAddresses.remove(nextCandidate); - getReportedPeersFromSeedNode(nextCandidate, nodeAddresses); + private void stopTimeoutTimer() { + if (timeoutTimer != null) { + timeoutTimer.cancel(); + timeoutTimer = null; } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 764bbd0387..5f8cdfde7f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -2,10 +2,10 @@ package io.bitsquare.p2p.peers; import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; +import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.Connection; -import io.bitsquare.p2p.network.ConnectionListener; -import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.network.*; +import io.bitsquare.p2p.peers.messages.data.DataRequest; import io.bitsquare.storage.Storage; import javafx.beans.value.ChangeListener; import org.jetbrains.annotations.Nullable; @@ -16,11 +16,10 @@ import java.io.File; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; -public class PeerManager implements ConnectionListener { +public class PeerManager implements ConnectionListener, MessageListener { private static final Logger log = LoggerFactory.getLogger(PeerManager.class); /////////////////////////////////////////////////////////////////////////////////////////// @@ -55,7 +54,7 @@ public class PeerManager implements ConnectionListener { private final HashSet persistedPeers = new HashSet<>(); private final Set reportedPeers = new HashSet<>(); private Timer checkMaxConnectionsTimer; - private final ChangeListener stateChangeListener; + private final ChangeListener connectionNodeAddressListener; /////////////////////////////////////////////////////////////////////////////////////////// @@ -68,9 +67,9 @@ public class PeerManager implements ConnectionListener { networkNode.addConnectionListener(this); createDbStorage(storageDir); - stateChangeListener = (observable, oldValue, newValue) -> { + connectionNodeAddressListener = (observable, oldValue, newValue) -> { printConnectedPeers(); - if (checkMaxConnectionsTimer == null && newValue == Connection.State.SUCCEEDED) + if (checkMaxConnectionsTimer == null && newValue != null) checkMaxConnectionsTimer = UserThread.runAfter(() -> checkMaxConnections(MAX_CONNECTIONS), 3); }; } @@ -105,19 +104,46 @@ public class PeerManager implements ConnectionListener { @Override public void onConnection(Connection connection) { - connection.getStateProperty().addListener(stateChangeListener); + connection.getNodeAddressProperty().addListener(connectionNodeAddressListener); + Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); + // OutboundConnection always know their peers address + // A seed node get only InboundConnection from other seed nodes with getData or peer exchange, + // never direct messages, so we need to check for PeerType.SEED_NODE at onMessage + if (connection instanceof OutboundConnection && + peersNodeAddressOptional.isPresent() && + seedNodeAddresses.contains(peersNodeAddressOptional.get())) { + connection.setPeerType(Connection.PeerType.SEED_NODE); + } } @Override public void onDisconnect(Reason reason, Connection connection) { - connection.getStateProperty().removeListener(stateChangeListener); - connection.getPeersNodeAddressOptional().ifPresent(this::removePeer); + connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); + //connection.getPeersNodeAddressOptional().ifPresent(this::removePeer); } @Override public void onError(Throwable throwable) { } + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + // In case a seed node connects to another seed node we get his address at the DataRequest triggered from + // RequestDataManager.updateDataFromConnectedSeedNode + if (message instanceof DataRequest) { + Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); + if (peersNodeAddressOptional.isPresent() && + seedNodeAddresses.contains(peersNodeAddressOptional.get())) + connection.setPeerType(Connection.PeerType.SEED_NODE); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // Check seed node connections /////////////////////////////////////////////////////////////////////////////////////////// @@ -125,7 +151,6 @@ public class PeerManager implements ConnectionListener { protected boolean checkMaxConnections(int limit) { Log.traceCall(); stopCheckMaxConnectionsTimer(); - cleanupFailedConnections(); removeSuperfluousSeedNodes(); Set allConnections = networkNode.getAllConnections(); int size = allConnections.size(); @@ -133,10 +158,9 @@ public class PeerManager implements ConnectionListener { log.info("We have {} connections open. Our limit is {}", size, limit); log.info("Lets try to remove the inbound connections of type PEER."); - // Only outbound, SUCCEEDED, and PEER type connections + // Only InboundConnection, and PEER type connections List candidates = allConnections.stream() - .filter(e -> e.getState() == Connection.State.SUCCEEDED) - .filter(e -> e.getDirection() == Connection.Direction.INBOUND) + .filter(e -> e instanceof InboundConnection) .filter(e -> e.getPeerType() == Connection.PeerType.PEER) .collect(Collectors.toList()); @@ -145,10 +169,9 @@ public class PeerManager implements ConnectionListener { "MAX_CONNECTIONS_NORMAL_PRIORITY limit of {}", MAX_CONNECTIONS_EXTENDED_1); if (size > MAX_CONNECTIONS_EXTENDED_1) { log.info("Lets try to remove any connection of type PEER."); - // All expect SUCCEEDED and DIRECT_MSG_PEER type connections + // Only PEER type connections candidates = allConnections.stream() - .filter(e -> e.getState() == Connection.State.SUCCEEDED) - .filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) + .filter(e -> e.getPeerType() == Connection.PeerType.PEER) .collect(Collectors.toList()); if (candidates.size() == 0) { log.info("No candidates found. We go to the next level and check if we exceed our " + @@ -182,16 +205,6 @@ public class PeerManager implements ConnectionListener { } } - protected void cleanupFailedConnections() { - // We close any failed but still open connection (check if that can happen at all) - Stream failedConnections = networkNode.getAllConnections().stream() - .filter(e -> e.getState() == Connection.State.FAILED); - failedConnections.findAny().ifPresent(e -> { - log.warn("There is a connection with a failed state. That should not happen. We close it."); - e.shutDown(this::cleanupFailedConnections); - }); - } - protected void removeSuperfluousSeedNodes() { Set allConnections = networkNode.getAllConnections(); if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) { @@ -229,13 +242,16 @@ public class PeerManager implements ConnectionListener { } public Set getNodeAddressesOfReportedPeers() { - return reportedPeers.stream().map(e -> e.nodeAddress).collect(Collectors.toSet()); + return reportedPeers.stream().map(e -> e.nodeAddress) + .filter(e -> !isSeedNode(e) && + !e.equals(networkNode.getNodeAddress())) + .collect(Collectors.toSet()); } public void addToReportedPeers(HashSet reportedPeersToAdd, Connection connection) { Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd); // we disconnect misbehaving nodes trying to send too many peers - // reported peers include the authenticated peers which is normally max. 8 but we give some headroom + // reported peers include the connected peers which is normally max. 10 but we give some headroom // for safety if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + PeerManager.MIN_CONNECTIONS * 3)) { connection.shutDown(); @@ -269,13 +285,7 @@ public class PeerManager implements ConnectionListener { purgeReportedPeersIfExceeds(); - // We add all adjustedReportedPeers to persistedReportedPeers but only save the 500 peers with the most - // recent lastActivityDate. - // ReportedPeers is changing when peers authenticate (remove) so we don't use that but - // the persistedReportedPeers set. persistedPeers.addAll(reportedPeersToAdd); - - // We add also our authenticated and authenticating peers persistedPeers.addAll(new HashSet<>(getConnectedPeers())); // We remove if we exceeds MAX_PERSISTED_PEERS limit @@ -329,13 +339,20 @@ public class PeerManager implements ConnectionListener { return persistedPeers; } + public Set getNodeAddressesOfPersistedPeers() { + return persistedPeers.stream().map(e -> e.nodeAddress) + .filter(e -> !isSeedNode(e) && + !e.equals(networkNode.getNodeAddress())) + .collect(Collectors.toSet()); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Misc /////////////////////////////////////////////////////////////////////////////////////////// public boolean hasSufficientConnections() { - return networkNode.getNodeAddressesOfSucceededConnections().size() >= MIN_CONNECTIONS; + return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; } public void removePeer(NodeAddress nodeAddress) { @@ -350,9 +367,9 @@ public class PeerManager implements ConnectionListener { } public Set getConnectedPeers() { - // networkNode.getSucceededConnections includes: + // networkNode.getConfirmedConnections includes: // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) - return networkNode.getSucceededConnections().stream() + return networkNode.getConfirmedConnections().stream() .map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate())) .collect(Collectors.toSet()); } @@ -366,7 +383,7 @@ public class PeerManager implements ConnectionListener { } public boolean isSeedNode(Connection connection) { - return connection.getPeersNodeAddressOptional().isPresent() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get()); + return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get()); } @@ -407,10 +424,10 @@ public class PeerManager implements ConnectionListener { } private void printConnectedPeers() { - if (!networkNode.getNodeAddressesOfSucceededConnections().isEmpty()) { + if (!networkNode.getNodeAddressesOfConfirmedConnections().isEmpty()) { StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + "Connected peers for node " + networkNode.getNodeAddress() + ":"); - networkNode.getNodeAddressesOfSucceededConnections().stream().forEach(e -> result.append("\n").append(e)); + networkNode.getNodeAddressesOfConfirmedConnections().stream().forEach(e -> result.append("\n").append(e)); result.append("\n------------------------------------------------------------\n"); log.info(result.toString()); } @@ -425,6 +442,4 @@ public class PeerManager implements ConnectionListener { log.info(result.toString()); } } - - } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index da463e6d37..820601f87b 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -12,8 +12,8 @@ import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.peers.messages.data.DataRequest; import io.bitsquare.p2p.peers.messages.data.DataResponse; +import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest; import io.bitsquare.p2p.storage.P2PDataStorage; -import io.bitsquare.p2p.storage.data.ProtectedData; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -28,34 +28,35 @@ import static com.google.common.base.Preconditions.checkArgument; public class RequestDataManager implements MessageListener { private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); - /////////////////////////////////////////////////////////////////////////////////////////// // Listener /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onNoSeedNodeAvailable(); + void onPreliminaryDataReceived(); - void onNoPeersAvailable(); + void onUpdatedDataReceived(); void onDataReceived(); - void onPreliminaryDataReceived(); + void onNoPeersAvailable(); - void onDataUpdate(); + void onNoSeedNodeAvailable(); } + /////////////////////////////////////////////////////////////////////////////////////////// + // Class fields + /////////////////////////////////////////////////////////////////////////////////////////// + private final NetworkNode networkNode; - protected final P2PDataStorage dataStorage; + private final P2PDataStorage dataStorage; private final PeerManager peerManager; - private final HashSet persistedPeers = new HashSet<>(); - private final HashSet remainingPersistedPeers = new HashSet<>(); - private Listener listener; - private Optional seedNodeOfPreliminaryDataRequest = Optional.empty(); private final Collection seedNodeAddresses; - private Timer requestDataFromSeedNodesTimer, requestDataFromPersistedPeersTimer, dataRequestTimeoutTimer; - private boolean noSeedNodeAvailableListenerNotified; + private final Listener listener; + + private Optional nodeOfPreliminaryDataRequest = Optional.empty(); + private Timer requestDataAfterDelayTimer, timeoutTimer; private boolean dataUpdateRequested; @@ -63,11 +64,13 @@ public class RequestDataManager implements MessageListener { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Set seedNodeAddresses) { + public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, + Set seedNodeAddresses, Listener listener) { this.networkNode = networkNode; this.dataStorage = dataStorage; this.peerManager = peerManager; this.seedNodeAddresses = new HashSet<>(seedNodeAddresses); + this.listener = listener; networkNode.addMessageListener(this); } @@ -77,9 +80,8 @@ public class RequestDataManager implements MessageListener { networkNode.removeMessageListener(this); - stopRequestDataFromSeedNodesTimer(); - stopRequestDataFromPersistedPeersTimer(); - stopDataRequestTimeoutTimer(); + stopRequestDataTimer(); + stopTimeoutTimer(); } @@ -87,23 +89,24 @@ public class RequestDataManager implements MessageListener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void setRequestDataManagerListener(Listener listener) { - this.listener = listener; - } - public void requestPreliminaryData() { - requestDataFromPeers(seedNodeAddresses); - } - - public void updateDataFromConnectedSeedNode() { Log.traceCall(); - checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); - dataUpdateRequested = true; - requestDataFromPeer(seedNodeOfPreliminaryDataRequest.get(), seedNodeAddresses); + checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty."); + requestDataFromList(new ArrayList<>(seedNodeAddresses)); } - public Optional getSeedNodeOfPreliminaryDataRequest() { - return seedNodeOfPreliminaryDataRequest; + public void requestUpdatesData() { + Log.traceCall(); + checkArgument(nodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present"); + dataUpdateRequested = true; + List remainingNodeAddresses = new ArrayList<>(seedNodeAddresses); + NodeAddress candidate = nodeOfPreliminaryDataRequest.get(); + remainingNodeAddresses.remove(candidate); + requestData(candidate, remainingNodeAddresses); + } + + public Optional getNodeOfPreliminaryDataRequest() { + return nodeOfPreliminaryDataRequest; } @@ -113,39 +116,26 @@ public class RequestDataManager implements MessageListener { @Override public void onMessage(Message message, Connection connection) { - Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); - if (message instanceof DataRequest) { - // We are a seed node and receive that msg from a new node + if (message instanceof PreliminaryDataRequest || message instanceof DataRequest) { Log.traceCall(message.toString()); - DataRequest dataRequest = (DataRequest) message; - if (peersNodeAddressOptional.isPresent()) { - checkArgument(peersNodeAddressOptional.get().equals(dataRequest.senderNodeAddress), - "Sender address in message not matching the peers address in our connection."); - } else if (dataRequest.senderNodeAddress != null) { - // If first data request the peer does not has its address - // in case of requesting from first seed node after hidden service is published we did not knew the peers address - connection.setPeersNodeAddress(dataRequest.senderNodeAddress); - } networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values()))); } else if (message instanceof DataResponse) { - // We are the new node which has requested the data Log.traceCall(message.toString()); - DataResponse dataResponse = (DataResponse) message; - HashSet set = dataResponse.set; - // we keep that connection open as the bootstrapping peer will use that later for a re-sync - // as the hidden service is not published yet the data adding will not be broadcasted to others - peersNodeAddressOptional.ifPresent(peersNodeAddress -> set.stream().forEach(e -> dataStorage.add(e, peersNodeAddress))); + stopTimeoutTimer(); + connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { + ((DataResponse) message).dataSet.stream() + .forEach(e -> dataStorage.add(e, peersNodeAddress)); - stopDataRequestTimeoutTimer(); - connection.getPeersNodeAddressOptional().ifPresent(e -> { - if (!seedNodeOfPreliminaryDataRequest.isPresent()) { - seedNodeOfPreliminaryDataRequest = Optional.of(e); + // 1. We get a response from requestPreliminaryData + if (!nodeOfPreliminaryDataRequest.isPresent()) { + nodeOfPreliminaryDataRequest = Optional.of(peersNodeAddress); listener.onPreliminaryDataReceived(); } + // 2. Later we get a response from requestUpdatesData if (dataUpdateRequested) { dataUpdateRequested = false; - listener.onDataUpdate(); + listener.onUpdatedDataReceived(); } listener.onDataReceived(); @@ -158,149 +148,110 @@ public class RequestDataManager implements MessageListener { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void requestDataFromPeers(Collection nodeAddresses) { - Log.traceCall(nodeAddresses.toString()); - checkArgument(!nodeAddresses.isEmpty(), "requestDataFromPeers: nodeAddresses must not be empty."); - stopRequestDataFromSeedNodesTimer(); - List remainingNodeAddresses = new ArrayList<>(nodeAddresses); - NodeAddress candidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size())); - requestDataFromPeer(candidate, remainingNodeAddresses); + private void requestDataFromList(List nodeAddresses) { + Log.traceCall("remainingNodeAddresses=" + nodeAddresses); + NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size())); + nodeAddresses.remove(nextCandidate); + requestData(nextCandidate, nodeAddresses); } - private void requestDataFromPeer(NodeAddress nodeAddress, Collection remainingNodeAddresses) { - Log.traceCall(nodeAddress.toString()); - remainingNodeAddresses.remove(nodeAddress); - log.info("We try to send a DataRequest request to node. " + nodeAddress); + private void requestData(NodeAddress nodeAddress, List remainingNodeAddresses) { + Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + log.info("We try to send a DataRequest request to peer. " + nodeAddress); - stopDataRequestTimeoutTimer(); - dataRequestTimeoutTimer = UserThread.runAfter(() -> { - log.info("firstDataRequestTimeoutTimer called"); - if (!remainingNodeAddresses.isEmpty()) { - requestDataFromPeers(remainingNodeAddresses); - } else { - requestDataFromPersistedPeersAfterDelay(nodeAddress); - requestDataFromSeedNodesAfterDelay(); - } + stopTimeoutTimer(); + stopRequestDataTimer(); + + timeoutTimer = UserThread.runAfter(() -> { + log.info("timeoutTimer called"); + handleError(nodeAddress, remainingNodeAddresses); }, 10, TimeUnit.SECONDS); - SettableFuture future = networkNode.sendMessage(nodeAddress, new DataRequest(networkNode.getNodeAddress())); + Message dataRequest; + if (networkNode.getNodeAddress() == null) + dataRequest = new PreliminaryDataRequest(); + else + dataRequest = new DataRequest(networkNode.getNodeAddress()); + + SettableFuture future = networkNode.sendMessage(nodeAddress, dataRequest); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - log.info("Send DataRequest to " + nodeAddress + " succeeded."); - - if (connection != null) { - if (!connection.getPeersNodeAddressOptional().isPresent()) - connection.setPeersNodeAddress(nodeAddress); - - if (connection.getPeerType() == null) - connection.setPeerType(peerManager.isSeedNode(connection) ? Connection.PeerType.SEED_NODE : Connection.PeerType.PEER); - } + log.trace("Send DataRequest to " + nodeAddress + " succeeded."); } @Override public void onFailure(@NotNull Throwable throwable) { log.info("Send DataRequest to " + nodeAddress + " failed. " + - "That is expected if the node is offline. " + + "That is expected if the peer is offline. " + "Exception:" + throwable.getMessage()); - if (!remainingNodeAddresses.isEmpty()) { - log.info("There are more seed nodes available for requesting data. " + - "We will try requestData again."); - - ReportedPeer reportedPeer = new ReportedPeer(nodeAddress); - if (remainingPersistedPeers.contains(reportedPeer)) - remainingPersistedPeers.remove(reportedPeer); - - requestDataFromPeers(remainingNodeAddresses); - } else { - log.info("There is no seed node available for requesting data. " + - "That is expected if no seed node is online.\n" + - "We will try again to request data from a seed node after a random pause."); - - requestDataFromPersistedPeersAfterDelay(nodeAddress); - requestDataFromSeedNodesAfterDelay(); - } + handleError(nodeAddress, remainingNodeAddresses); } }); } - private void requestDataFromSeedNodesAfterDelay() { - Log.traceCall(); - // We only want to notify the first time - if (!noSeedNodeAvailableListenerNotified) { - noSeedNodeAvailableListenerNotified = true; - listener.onNoSeedNodeAvailable(); - } + private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { + Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + stopTimeoutTimer(); + //peerManager.removePeer(nodeAddress); - if (requestDataFromSeedNodesTimer == null) - requestDataFromSeedNodesTimer = UserThread.runAfterRandomDelay(() -> requestDataFromPeers(seedNodeAddresses), - 10, 20, TimeUnit.SECONDS); - } + if (!remainingNodeAddresses.isEmpty()) { + log.info("There are remaining nodes available for requesting data. " + + "We will try requestDataFromPeers again."); + requestDataFromList(remainingNodeAddresses); + } else { + log.info("There is no remaining node available for requesting data. " + + "That is expected if no other node is online.\n" + + "We will try to use reported peers (if no available we use persisted peers) " + + "and try again to request data from our seed nodes after a random pause."); - private void requestDataFromPersistedPeersAfterDelay(@Nullable NodeAddress failedPeer) { - Log.traceCall("failedPeer=" + failedPeer); + if (peerManager.isSeedNode(nodeAddress)) + listener.onNoSeedNodeAvailable(); + else + listener.onNoPeersAvailable(); - stopRequestDataFromPersistedPeersTimer(); - - if (persistedPeers.isEmpty()) { - persistedPeers.addAll(peerManager.getPersistedPeers()); - log.info("persistedPeers = " + persistedPeers); - remainingPersistedPeers.addAll(persistedPeers); - } - - if (failedPeer != null) { - ReportedPeer reportedPeer = new ReportedPeer(failedPeer); - if (remainingPersistedPeers.contains(reportedPeer)) - remainingPersistedPeers.remove(reportedPeer); - } - - boolean persistedPeersAvailable = false; - if (!remainingPersistedPeers.isEmpty()) { - Set persistedPeerNodeAddresses = remainingPersistedPeers.stream().map(e -> e.nodeAddress).collect(Collectors.toSet()); - if (!persistedPeerNodeAddresses.isEmpty()) { - log.info("We try to use persisted peers for requestData."); - persistedPeersAvailable = true; - requestDataFromPeers(persistedPeerNodeAddresses); - } - } - - if (!persistedPeersAvailable) { - log.warn("No seed nodes and no persisted peers are available for requesting data.\n" + - "We will try again after a random pause."); - noSeedNodeAvailableListenerNotified = true; - listener.onNoPeersAvailable(); - - // reset remainingPersistedPeers - remainingPersistedPeers.clear(); - remainingPersistedPeers.addAll(persistedPeers); - - if (!remainingPersistedPeers.isEmpty() && requestDataFromPersistedPeersTimer == null) - requestDataFromPersistedPeersTimer = UserThread.runAfterRandomDelay(() -> - requestDataFromPersistedPeersAfterDelay(null), - 30, 40, TimeUnit.SECONDS); + requestDataAfterDelayTimer = UserThread.runAfterRandomDelay(() -> { + log.trace("requestDataAfterDelayTimer called"); + if (!seedNodeAddresses.isEmpty()) { + Set nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections(); + // We want to keep it sorted but avoid duplicates + // We don't filter out already established connections for seed nodes as it might be that + // we got from the other seed node contacted but we still have not requested the initial + // data set + List list = new ArrayList<>(seedNodeAddresses); + list.addAll(peerManager.getNodeAddressesOfReportedPeers().stream() + .filter(e -> !list.contains(e)) + .collect(Collectors.toSet())); + list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream() + .filter(e -> !list.contains(e)) + .collect(Collectors.toSet())); + if (!list.isEmpty()) { + NodeAddress nextCandidate = list.get(0); + list.remove(nextCandidate); + requestData(nextCandidate, list); + } else { + log.info("Neither seed nodes, reported peers nor persisted peers are available. " + + "At least seed nodes should be always available."); + } + } + }, + 10, 15, TimeUnit.SECONDS); } } - private void stopRequestDataFromSeedNodesTimer() { - if (requestDataFromSeedNodesTimer != null) { - requestDataFromSeedNodesTimer.cancel(); - requestDataFromSeedNodesTimer = null; + private void stopRequestDataTimer() { + if (requestDataAfterDelayTimer != null) { + requestDataAfterDelayTimer.cancel(); + requestDataAfterDelayTimer = null; } } - private void stopRequestDataFromPersistedPeersTimer() { - if (requestDataFromPersistedPeersTimer != null) { - requestDataFromPersistedPeersTimer.cancel(); - requestDataFromPersistedPeersTimer = null; - } - } - - private void stopDataRequestTimeoutTimer() { - if (dataRequestTimeoutTimer != null) { - dataRequestTimeoutTimer.cancel(); - dataRequestTimeoutTimer = null; + private void stopTimeoutTimer() { + if (timeoutTimer != null) { + timeoutTimer.cancel(); + timeoutTimer = null; } } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java index ac574aa2ea..10fe81af53 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java @@ -1,23 +1,25 @@ package io.bitsquare.p2p.peers.messages.data; import io.bitsquare.app.Version; -import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; -import javax.annotation.Nullable; - -public final class DataRequest implements Message { +public final class DataRequest implements SendersNodeAddressMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.getNetworkId(); - @Nullable - public final NodeAddress senderNodeAddress; + private final NodeAddress senderNodeAddress; - public DataRequest(@Nullable NodeAddress senderNodeAddress) { + public DataRequest(NodeAddress senderNodeAddress) { this.senderNodeAddress = senderNodeAddress; } + @Override + public NodeAddress getSenderNodeAddress() { + return senderNodeAddress; + } + @Override public int networkId() { return networkId; @@ -25,9 +27,10 @@ public final class DataRequest implements Message { @Override public String toString() { - return "GetDataRequest{" + + return "DataRequest{" + "senderNodeAddress=" + senderNodeAddress + ", networkId=" + networkId + '}'; } + } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java index 37e572286a..5f04213cc1 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java @@ -11,10 +11,10 @@ public final class DataResponse implements Message { private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.getNetworkId(); - public final HashSet set; + public final HashSet dataSet; - public DataResponse(HashSet set) { - this.set = set; + public DataResponse(HashSet dataSet) { + this.dataSet = dataSet; } @Override @@ -29,20 +29,20 @@ public final class DataResponse implements Message { DataResponse that = (DataResponse) o; - return !(set != null ? !set.equals(that.set) : that.set != null); + return !(dataSet != null ? !dataSet.equals(that.dataSet) : that.dataSet != null); } @Override public int hashCode() { - return set != null ? set.hashCode() : 0; + return dataSet != null ? dataSet.hashCode() : 0; } @Override public String toString() { - return "GetDataResponse{" + + return "DataResponse{" + "networkId=" + networkId + - ", set=" + set + + ", dataSet=" + dataSet + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java new file mode 100644 index 0000000000..abaa5f5106 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/PreliminaryDataRequest.java @@ -0,0 +1,26 @@ +package io.bitsquare.p2p.peers.messages.data; + +import io.bitsquare.app.Version; +import io.bitsquare.p2p.network.messages.AnonymousMessage; + +public final class PreliminaryDataRequest implements AnonymousMessage { + // That object is sent over the wire, so we need to take care of version compatibility. + private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; + + private final int networkId = Version.getNetworkId(); + + public PreliminaryDataRequest() { + } + + @Override + public int networkId() { + return networkId; + } + + @Override + public String toString() { + return "PreliminaryDataRequest{" + + "networkId=" + networkId + + '}'; + } +} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java index 717b137efc..cd8fe090d4 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java @@ -2,15 +2,16 @@ package io.bitsquare.p2p.peers.messages.peers; import io.bitsquare.app.Version; import io.bitsquare.p2p.NodeAddress; +import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage; import io.bitsquare.p2p.peers.ReportedPeer; import java.util.HashSet; -public final class GetPeersRequest extends PeerExchangeMessage { +public final class GetPeersRequest extends PeerExchangeMessage implements SendersNodeAddressMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public final NodeAddress senderNodeAddress; + private NodeAddress senderNodeAddress; public final HashSet reportedPeers; public GetPeersRequest(NodeAddress senderNodeAddress, HashSet reportedPeers) { @@ -18,11 +19,17 @@ public final class GetPeersRequest extends PeerExchangeMessage { this.reportedPeers = reportedPeers; } + @Override + public NodeAddress getSenderNodeAddress() { + return senderNodeAddress; + } + @Override public String toString() { return "GetPeersRequest{" + - "senderAddress=" + senderNodeAddress + + "senderNodeAddress=" + senderNodeAddress + ", reportedPeers=" + reportedPeers + super.toString() + "} "; } + } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/data/ExpirableMailboxPayload.java b/network/src/main/java/io/bitsquare/p2p/storage/data/ExpirableMailboxPayload.java index e015cbcc4f..2c396c6f76 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/data/ExpirableMailboxPayload.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/data/ExpirableMailboxPayload.java @@ -1,7 +1,7 @@ package io.bitsquare.p2p.storage.data; import io.bitsquare.app.Version; -import io.bitsquare.crypto.SealedAndSignedMessage; +import io.bitsquare.crypto.DirectMessage; import java.security.PublicKey; @@ -11,12 +11,12 @@ public final class ExpirableMailboxPayload implements ExpirablePayload { private static final long TTL = 10 * 24 * 60 * 60 * 1000; // 10 days - public final SealedAndSignedMessage sealedAndSignedMessage; + public final DirectMessage directMessage; public final PublicKey senderStoragePublicKey; public final PublicKey receiverStoragePublicKey; - public ExpirableMailboxPayload(SealedAndSignedMessage sealedAndSignedMessage, PublicKey senderStoragePublicKey, PublicKey receiverStoragePublicKey) { - this.sealedAndSignedMessage = sealedAndSignedMessage; + public ExpirableMailboxPayload(DirectMessage directMessage, PublicKey senderStoragePublicKey, PublicKey receiverStoragePublicKey) { + this.directMessage = directMessage; this.senderStoragePublicKey = senderStoragePublicKey; this.receiverStoragePublicKey = receiverStoragePublicKey; } @@ -33,20 +33,20 @@ public final class ExpirableMailboxPayload implements ExpirablePayload { ExpirableMailboxPayload that = (ExpirableMailboxPayload) o; - return !(sealedAndSignedMessage != null ? !sealedAndSignedMessage.equals(that.sealedAndSignedMessage) : that.sealedAndSignedMessage != null); + return !(directMessage != null ? !directMessage.equals(that.directMessage) : that.directMessage != null); } @Override public int hashCode() { - return sealedAndSignedMessage != null ? sealedAndSignedMessage.hashCode() : 0; + return directMessage != null ? directMessage.hashCode() : 0; } @Override public String toString() { return "MailboxEntry{" + "hashCode=" + hashCode() + - ", sealedAndSignedMessage=" + sealedAndSignedMessage + + ", sealedAndSignedMessage=" + directMessage + '}'; } } diff --git a/network/src/test/java/io/bitsquare/crypto/EncryptionServiceTests.java b/network/src/test/java/io/bitsquare/crypto/EncryptionServiceTests.java index 52393e4887..cb79d4460d 100644 --- a/network/src/test/java/io/bitsquare/crypto/EncryptionServiceTests.java +++ b/network/src/test/java/io/bitsquare/crypto/EncryptionServiceTests.java @@ -72,7 +72,9 @@ public class EncryptionServiceTests { public void testDecryptAndVerifyMessage() throws CryptoException { EncryptionService encryptionService = new EncryptionService(keyRing); TestMessage data = new TestMessage("test"); - SealedAndSignedMessage encrypted = new SealedAndSignedMessage(encryptionService.encryptAndSign(pubKeyRing, data), Hash.getHash("aa")); + DirectMessage encrypted = new DirectMessage(null, + encryptionService.encryptAndSign(pubKeyRing, data), + Hash.getHash("aa")); DecryptedMsgWithPubKey decrypted = encryptionService.decryptAndVerify(encrypted.sealedAndSigned); assertEquals(data.data, ((TestMessage) decrypted.message).data); } diff --git a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java index 5bff254670..f9b19543f3 100644 --- a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java +++ b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java @@ -1,8 +1,8 @@ package io.bitsquare.p2p; import io.bitsquare.common.crypto.*; +import io.bitsquare.crypto.DirectMessage; import io.bitsquare.crypto.EncryptionService; -import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey; import io.bitsquare.p2p.messaging.MailboxMessage; import io.bitsquare.p2p.messaging.SendMailboxMessageListener; @@ -287,10 +287,10 @@ public class P2PServiceTest { MockMailboxMessage mockMessage = new MockMailboxMessage("MockMailboxMessage", p2PService2.getAddress()); p2PService2.getNetworkNode().addMessageListener((message, connection) -> { log.trace("message " + message); - if (message instanceof SealedAndSignedMessage) { + if (message instanceof DirectMessage) { try { - SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; - DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService2.decryptAndVerify(sealedAndSignedMessage.sealedAndSigned); + DirectMessage directMessage = (DirectMessage) message; + DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService2.decryptAndVerify(directMessage.sealedAndSigned); Assert.assertEquals(mockMessage, decryptedMsgWithPubKey.message); Assert.assertEquals(p2PService2.getAddress(), ((MailboxMessage) decryptedMsgWithPubKey.message).getSenderNodeAddress()); latch2.countDown(); diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index 8b3c9fb7fa..0696734558 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -3,8 +3,8 @@ package io.bitsquare.p2p.storage; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.*; import io.bitsquare.common.util.Utilities; +import io.bitsquare.crypto.DirectMessage; import io.bitsquare.crypto.EncryptionService; -import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.TestUtils; import io.bitsquare.p2p.mocks.MockMessage; @@ -222,8 +222,10 @@ public class ProtectedDataStorageTest { KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { // sender MockMessage mockMessage = new MockMessage("MockMessage"); - SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage), Hash.getHash("aa")); - ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(sealedAndSignedMessage, + DirectMessage directMessage = new DirectMessage(networkNode1.getNodeAddress(), + encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage), + Hash.getHash("aa")); + ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(directMessage, keyRing1.getSignatureKeyPair().getPublic(), keyRing2.getSignatureKeyPair().getPublic());