P2P network / UI improvements

This commit is contained in:
Manfred Karrer 2016-01-26 16:37:08 +01:00
parent 2ad72938ec
commit 991a4350ac
33 changed files with 651 additions and 729 deletions

View File

@ -34,6 +34,7 @@ import java.security.*;
import java.util.Arrays;
// TODO: which counter modes and paddings should we use?
// TODO is Hmac needed/make sense?
// https://security.stackexchange.com/questions/52665/which-is-the-best-cipher-mode-and-padding-mode-for-aes-encryption
public class Encryption {
private static final Logger log = LoggerFactory.getLogger(Encryption.class);

View File

@ -114,7 +114,6 @@ class AddressBasedCoinSelector implements CoinSelector {
log.trace("value needed: " + targetAsLong);
HashSet<TransactionOutput> selected = new HashSet<>();
// Sort the inputs by age*value so we get the highest "coindays" spent.
// TODO: Consider changing the wallets internal format to track just outputs and keep them ordered.
ArrayList<TransactionOutput> sortedOutputs = new ArrayList<>(candidates);
// When calculating the wallet balance, we may be asked to select all possible coins, if so, avoid sorting
// them in order to improve performance.

View File

@ -251,7 +251,7 @@ public class BitsquareApp extends Application {
}
}
//TODO just temp.
// Used for debugging trade process
private void showDebugWindow() {
ViewLoader viewLoader = injector.getInstance(ViewLoader.class);
View debugView = viewLoader.load(DebugView.class);

View File

@ -1,74 +0,0 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.gui.components;
import javafx.animation.FadeTransition;
import javafx.animation.Interpolator;
import javafx.scene.control.Label;
import javafx.scene.control.ProgressBar;
import javafx.scene.layout.HBox;
import javafx.scene.layout.Pane;
import javafx.util.Duration;
// TODO replace with new notification component from lighthouse/bitcoinJ
public class NetworkSyncPane extends HBox {
private final ProgressBar networkSyncProgressBar;
private final Label networkSyncInfoLabel;
public NetworkSyncPane() {
networkSyncInfoLabel = new Label();
networkSyncInfoLabel.setText("Synchronize with network...");
networkSyncProgressBar = new ProgressBar();
networkSyncProgressBar.setPrefWidth(200);
networkSyncProgressBar.setProgress(-1);
getChildren().addAll(new HSpacer(5), networkSyncProgressBar, networkSyncInfoLabel);
}
public void setProgress(double percent) {
networkSyncProgressBar.setProgress(percent / 100.0);
networkSyncInfoLabel.setText("Synchronize with network: " + (int) percent + "%");
}
public void downloadComplete() {
networkSyncInfoLabel.setText("Sync with network: Done");
networkSyncProgressBar.setProgress(1);
FadeTransition fade = new FadeTransition(Duration.millis(700), this);
fade.setToValue(0.0);
fade.setCycleCount(1);
fade.setInterpolator(Interpolator.EASE_BOTH);
fade.play();
fade.setOnFinished(e -> getChildren().clear());
}
}
class HSpacer extends Pane {
public HSpacer(double width) {
setPrefWidth(width);
}
@Override
protected double computePrefWidth(double width) {
return getPrefWidth();
}
}

View File

@ -270,7 +270,7 @@ public class MainViewModel implements ViewModel {
"Maybe you lost your internet connection or your computer was in hibernate/sleep mode.");
else
walletServiceErrorMsg.set(null);
}, 2);
}, 5);
} else if ((int) oldValue == 0 && (int) newValue > 0) {
walletServiceErrorMsg.set(null);
}
@ -399,7 +399,7 @@ public class MainViewModel implements ViewModel {
p2PNetworkWarnMsg.set(null);
p2PNetworkLabelId.set("footer-pane");
}
}, 2);
}, 5);
} else if ((int) oldValue == 0 && (int) newValue > 0) {
p2PNetworkWarnMsg.set(null);
p2PNetworkLabelId.set("footer-pane");

View File

@ -90,7 +90,6 @@ public class EnterPrivKeyPopup extends Popup {
GridPane.setRowIndex(label, ++rowIndex);
keyInputTextField = new InputTextField();
//TODO change when testing is done
if (BitsquareApp.DEV_MODE)
keyInputTextField.setText("6ac43ea1df2a290c1c8391736aa42e4339c5cb4f110ff0257a13b63211977b7a");
GridPane.setMargin(keyInputTextField, new Insets(3, 0, 0, 0));

View File

@ -128,9 +128,6 @@ public class TransactionsView extends ActivatableView<VBox, Void> {
}
private void openTxDetails(TransactionsListItem item) {
// TODO Open popup with details view
log.debug("openTxDetails " + item);
if (!item.isNotAnAddress()) {
try {
Utilities.openWebPage(preferences.getBlockChainExplorer().addressUrl + item.getAddressString());

View File

@ -126,9 +126,6 @@ public class WithdrawalView extends ActivatableView<VBox, Void> {
}
private void openTxDetails(WithdrawalListItem item) {
// TODO Open popup with details view
log.debug("openTxDetails " + item);
try {
Utilities.openWebPage(preferences.getBlockChainExplorer().addressUrl + item.getAddressString());
} catch (Exception e) {
@ -241,8 +238,7 @@ public class WithdrawalView extends ActivatableView<VBox, Void> {
withdrawFromTextField.setPromptText("Select a source address from the table");
amountTextField.setText("");
amountTextField.setPromptText("");
if (!BitsquareApp.DEV_MODE)
withdrawToTextField.setText("");
withdrawToTextField.setText("");
withdrawToTextField.setPromptText("");
}

View File

@ -67,13 +67,9 @@ import static javafx.beans.binding.Bindings.createStringBinding;
// priceAmountHBox is too large after redesign as to be used as layoutReference.
@FxmlView
public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOfferViewModel> {
// TODO convert unneeded properties to static fields
private final Navigation navigation;
private final BSFormatter formatter;
private final OfferDetailsPopup offerDetailsPopup;
private ScrollPane scrollPane;
private GridPane gridPane;
private ImageView imageView;
@ -89,11 +85,8 @@ public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOffer
volumeCurrencyLabel, amountRangeBtcLabel, priceDescriptionLabel, volumeDescriptionLabel, takeOfferSpinnerInfoLabel;
private PopOver totalToPayInfoPopover;
private OfferView.CloseHandler closeHandler;
private ChangeListener<Boolean> amountFocusedListener;
private int gridRow = 0;
private ComboBox<PaymentAccount> paymentAccountsComboBox;
private Label paymentAccountsLabel;

View File

@ -26,6 +26,9 @@ import io.bitsquare.gui.util.validation.BtcValidator;
import io.bitsquare.gui.util.validation.InputValidator;
import io.bitsquare.locale.BSResources;
import io.bitsquare.locale.TradeCurrency;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.payment.PaymentAccount;
import io.bitsquare.payment.PaymentMethod;
import io.bitsquare.trade.Trade;
@ -44,10 +47,9 @@ import static javafx.beans.binding.Bindings.createStringBinding;
class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> implements ViewModel {
private final BtcValidator btcValidator;
private P2PService p2PService;
private final BSFormatter formatter;
// static fields
private String amountRange;
private String addressAsString;
private String paymentLabel;
@ -58,8 +60,6 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
private String directionLabel;
private String amountDescription;
// TODO convert unneeded properties to static fields
// dynamic fields
final StringProperty amount = new SimpleStringProperty();
final StringProperty volume = new SimpleStringProperty();
final StringProperty volumeDescriptionLabel = new SimpleStringProperty();
@ -87,6 +87,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
private ChangeListener<String> tradeErrorListener;
private ChangeListener<Offer.State> offerStateListener;
private ChangeListener<String> offerErrorListener;
private ConnectionListener connectionListener;
///////////////////////////////////////////////////////////////////////////////////////////
@ -94,11 +95,12 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
public TakeOfferViewModel(TakeOfferDataModel dataModel, BtcValidator btcValidator,
public TakeOfferViewModel(TakeOfferDataModel dataModel, BtcValidator btcValidator, P2PService p2PService,
BSFormatter formatter) {
super(dataModel);
this.btcValidator = btcValidator;
this.p2PService = p2PService;
this.formatter = formatter;
createListeners();
@ -390,6 +392,22 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
tradeStateListener = (ov, oldValue, newValue) -> applyTradeState(newValue);
tradeErrorListener = (ov, oldValue, newValue) -> applyTradeErrorMessage(newValue);
offerStateListener = (ov, oldValue, newValue) -> applyOfferState(newValue);
connectionListener = new ConnectionListener() {
@Override
public void onDisconnect(Reason reason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(offer.getOffererNodeAddress()))
offerWarning.set("You cannot take that offer because the offerer went offline.");
}
@Override
public void onConnection(Connection connection) {
}
@Override
public void onError(Throwable throwable) {
}
};
}
private void addListeners() {
@ -401,7 +419,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
dataModel.amountAsCoin.addListener(amountAsCoinListener);
dataModel.isWalletFunded.addListener(isWalletFundedListener);
p2PService.getNetworkNode().addConnectionListener(connectionListener);
}
private void removeListeners() {
@ -420,6 +438,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
trade.stateProperty().removeListener(tradeStateListener);
trade.errorMessageProperty().removeListener(tradeErrorListener);
}
p2PService.getNetworkNode().removeConnectionListener(connectionListener);
}

View File

@ -38,14 +38,6 @@ import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
//TODO convert to non static
/**
* Central point for formatting and input parsing.
* <p>
* Note that we never use for text input values any coin or currency symbol or code.
* BtcFormat does not support
*/
public class BSFormatter {
private static final Logger log = LoggerFactory.getLogger(BSFormatter.class);

View File

@ -4,25 +4,28 @@ import io.bitsquare.app.Version;
import io.bitsquare.common.crypto.SealedAndSigned;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.messaging.MailboxMessage;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import java.util.Arrays;
public final class SealedAndSignedMessage implements MailboxMessage {
public final class DirectMessage implements MailboxMessage, SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
private final NodeAddress senderNodeAddress;
public final SealedAndSigned sealedAndSigned;
public final byte[] addressPrefixHash;
public SealedAndSignedMessage(SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) {
public DirectMessage(NodeAddress senderNodeAddress, SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) {
this.senderNodeAddress = senderNodeAddress;
this.sealedAndSigned = sealedAndSigned;
this.addressPrefixHash = addressPrefixHash;
}
@Override
public NodeAddress getSenderNodeAddress() {
return null;
return senderNodeAddress;
}
@Override

View File

@ -12,8 +12,8 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.crypto.DirectMessage;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.Broadcaster;
@ -74,7 +74,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
private ChangeListener<Connection.State> stateChangeListener;
private ChangeListener<NodeAddress> connectionNodeAddressListener;
private Subscription networkReadySubscription;
@ -99,19 +99,19 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService);
optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing);
init(useLocalhost, networkId, storageDir);
}
protected void init(boolean useLocalhost, int networkId, File storageDir) {
Log.traceCall();
stateChangeListener = (observable, oldValue, newValue) -> {
Set<NodeAddress> nodeAddressesOfAllSucceededConnections = networkNode.getNodeAddressesOfSucceededConnections();
Set<Connection> allSucceededConnections = networkNode.getSucceededConnections();
log.info("List of peers (duplicates possible of inbound/outbound with same node address)\n" + nodeAddressesOfAllSucceededConnections);
log.info("Nr of connections: {} / {}", nodeAddressesOfAllSucceededConnections.size(), allSucceededConnections.size());
UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfAllSucceededConnections.size()));
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
Set<NodeAddress> nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections();
Set<Connection> allConfirmedConnections = networkNode.getConfirmedConnections();
log.info("nodeAddressesOfConfirmedConnections=" + nodeAddressesOfConfirmedConnections);
log.info("allConfirmedConnections=" + allConfirmedConnections);
log.info("Nr of connections: {} / {} (nodeAddressesOfConfirmedConnections / allConfirmedConnections)", nodeAddressesOfConfirmedConnections.size(), allConfirmedConnections.size());
UserThread.execute(() -> numConnectedPeers.set(nodeAddressesOfConfirmedConnections.size()));
};
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
@ -126,8 +126,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Set<NodeAddress> seedNodeAddresses = seedNodesRepository.getSeedNodeAddresses(useLocalhost, networkId);
peerManager = new PeerManager(networkNode, seedNodeAddresses, storageDir);
requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses);
requestDataManager.setRequestDataManagerListener(this);
requestDataManager = new RequestDataManager(networkNode, p2PDataStorage, peerManager, seedNodeAddresses, this);
peerExchangeManager = new PeerExchangeManager(networkNode, peerManager, seedNodeAddresses);
@ -211,7 +210,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onTorNodeReady() {
Log.traceCall();
// 1
requestDataManager.requestPreliminaryData();
p2pServiceListeners.stream().forEach(SetupListener::onTorNodeReady);
}
@ -219,7 +218,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onHiddenServicePublished() {
Log.traceCall();
// 3
checkArgument(networkNode.getNodeAddress() != null, "Address must be set when we have the hidden service ready");
hiddenServicePublished.set(true);
@ -239,14 +238,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall();
networkReadySubscription.unsubscribe();
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getSeedNodeOfPreliminaryDataRequest();
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
"seedNodeOfPreliminaryDataRequest must be present");
// 4
requestDataManager.updateDataFromConnectedSeedNode();
// 5
peerExchangeManager.getReportedPeersFromFirstSeedNode(seedNodeOfPreliminaryDataRequest.get());
requestDataManager.requestUpdatesData();
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -256,13 +252,17 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onPreliminaryDataReceived() {
checkArgument(!preliminaryDataReceived.get(), "preliminaryDataReceived was already set before.");
// 2
preliminaryDataReceived.set(true);
}
@Override
public void onDataUpdate() {
// Result from requestDataManager.updateDataFromConnectedSeedNode();
public void onUpdatedDataReceived() {
Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = requestDataManager.getNodeOfPreliminaryDataRequest();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(),
"seedNodeOfPreliminaryDataRequest must be present");
peerExchangeManager.requestReportedPeers(seedNodeOfPreliminaryDataRequest.get());
p2pServiceListeners.stream().forEach(P2PServiceListener::onBootstrapped);
}
@ -288,13 +288,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onConnection(Connection connection) {
connection.getStateProperty().addListener(stateChangeListener);
if (connection.getPeersNodeAddressOptional().isPresent()) {
connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null,
connection.getNodeAddressProperty().get());
} else {
connection.getNodeAddressProperty().addListener(connectionNodeAddressListener);
}
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
connection.getStateProperty().removeListener(stateChangeListener);
UserThread.runAfter(() -> numConnectedPeers.set(networkNode.getNodeAddressesOfSucceededConnections().size()), 1);
Log.traceCall();
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
// We removed the listener after a delay to be sure the connection has been removed
// from the networkNode already.
UserThread.runAfter(() ->
connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null,
connection.getNodeAddressProperty().get())
, 1);
}
@Override
@ -308,19 +319,19 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onMessage(Message message, Connection connection) {
if (message instanceof SealedAndSignedMessage) {
if (message instanceof DirectMessage) {
Log.traceCall(message.toString());
// Seed nodes don't have set the encryptionService
if (optionalEncryptionService.isPresent()) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
DirectMessage directMessage = (DirectMessage) message;
if (verifyAddressPrefixHash(directMessage)) {
// We set connectionType to that connection to avoid that is get closed when
// we get too many connection attempts.
connection.setPeerType(Connection.PeerType.PEER.DIRECT_MSG_PEER);
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify(
directMessage.sealedAndSigned);
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"Decrypted SealedAndSignedMessage:\ndecryptedMsgWithPubKey={}"
@ -370,27 +381,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private void doSendEncryptedMailMessage(NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, MailMessage message,
private void doSendEncryptedMailMessage(@NotNull NodeAddress peersNodeAddress, PubKeyRing pubKeyRing, MailMessage message,
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at doSendEncryptedMailMessage");
checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen.");
checkNotNull(networkNode.getNodeAddress(), "networkNode.getNodeAddress() must not be null.");
try {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Encrypt message:\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message);
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
optionalEncryptionService.get().encryptAndSign(pubKeyRing, message), peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedAndSignedMessage);
DirectMessage directMessage = new DirectMessage(networkNode.getNodeAddress(),
optionalEncryptionService.get().encryptAndSign(pubKeyRing, message),
peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, directMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendMailMessageListener.onArrived();
if (connection != null) {
if (!connection.getPeersNodeAddressOptional().isPresent() && peersNodeAddress != null)
connection.setPeersNodeAddress(peersNodeAddress);
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
}
}
@Override
@ -417,11 +425,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
ExpirablePayload expirablePayload = mailboxData.expirablePayload;
if (expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) expirablePayload;
SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage;
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
DirectMessage directMessage = expirableMailboxPayload.directMessage;
if (verifyAddressPrefixHash(directMessage)) {
try {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = optionalEncryptionService.get().decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
directMessage.sealedAndSigned);
if (decryptedMsgWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message;
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress();
@ -465,27 +473,24 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private void trySendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at trySendEncryptedMailboxMessage");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen.");
checkNotNull(networkNode.getNodeAddress(), "networkNode.getNodeAddress() must not be null.");
try {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Encrypt message:\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message);
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message), peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedAndSignedMessage);
DirectMessage directMessage = new DirectMessage(
networkNode.getNodeAddress(),
optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message),
peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, directMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
sendMailboxMessageListener.onArrived();
if (connection != null) {
if (!connection.getPeersNodeAddressOptional().isPresent() && peersNodeAddress != null)
connection.setPeersNodeAddress(peersNodeAddress);
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
}
}
@Override
@ -495,7 +500,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peersNodeAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage,
addMailboxData(new ExpirableMailboxPayload(directMessage,
optionalKeyRing.get().getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
@ -664,7 +669,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
public Set<NodeAddress> getNodeAddressesOfConnectedPeers() {
return networkNode.getNodeAddressesOfSucceededConnections();
return networkNode.getNodeAddressesOfConfirmedConnections();
}
public ReadOnlyIntegerProperty getNumConnectedPeers() {
@ -680,11 +685,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private boolean verifyAddressPrefixHash(SealedAndSignedMessage sealedAndSignedMessage) {
private boolean verifyAddressPrefixHash(DirectMessage directMessage) {
if (networkNode.getNodeAddress() != null) {
byte[] blurredAddressHash = networkNode.getNodeAddress().getAddressPrefixHash();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, sealedAndSignedMessage.addressPrefixHash);
Arrays.equals(blurredAddressHash, directMessage.addressPrefixHash);
} else {
log.debug("myOnionAddress is null at verifyAddressPrefixHash. That is expected at startup.");
return false;

View File

@ -6,10 +6,12 @@ import io.bitsquare.app.Log;
import io.bitsquare.app.Version;
import io.bitsquare.common.ByteArrayUtils;
import io.bitsquare.common.UserThread;
import io.bitsquare.crypto.DirectMessage;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.network.messages.CloseConnectionMessage;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
@ -26,12 +28,12 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Connection is created by the server thread or by sendMessage from NetworkNode.
* All handlers are called on User thread.
* Shared data between InputHandler thread and that
*/
public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
@ -47,17 +49,6 @@ public class Connection implements MessageListener {
// Enums
///////////////////////////////////////////////////////////////////////////////////////////
public enum Direction {
OUTBOUND,
INBOUND
}
public enum State {
IDLE,
SUCCEEDED,
FAILED
}
public enum PeerType {
SEED_NODE,
PEER,
@ -72,12 +63,11 @@ public class Connection implements MessageListener {
private final Socket socket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final Direction direction;
private final String portInfo;
private final String uid = UUID.randomUUID().toString();
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// holder of state shared between InputHandler and Connection
private final SharedSpace sharedSpace;
private final SharedModel sharedModel;
// set in init
private InputHandler inputHandler;
@ -91,35 +81,32 @@ public class Connection implements MessageListener {
// java.util.zip.DataFormatException: invalid literal/lengths set
// use GZIPInputStream but problems with blocking
private final boolean useCompression = false;
private final ObjectProperty<State> stateProperty = new SimpleObjectProperty<>();
private PeerType peerType;
private ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, Direction direction) {
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress) {
Log.traceCall();
this.socket = socket;
this.messageListener = messageListener;
this.connectionListener = connectionListener;
this.direction = direction;
stateProperty.set(State.IDLE);
sharedSpace = new SharedSpace(this, socket);
sharedModel = new SharedModel(this, socket);
Log.traceCall();
if (socket.getLocalPort() == 0)
portInfo = "port=" + socket.getPort();
else
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
init();
init(peersNodeAddress);
}
private void init() {
private void init(@Nullable NodeAddress peersNodeAddress) {
Log.traceCall();
try {
@ -134,15 +121,22 @@ public class Connection implements MessageListener {
// We create a thread for handling inputStream data
inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo, this, useCompression);
inputHandler = new InputHandler(sharedModel, objectInputStream, portInfo, this, useCompression);
singleThreadExecutor.submit(inputHandler);
} catch (IOException e) {
sharedSpace.handleConnectionException(e);
sharedModel.handleConnectionException(e);
}
sharedSpace.updateLastActivityDate();
sharedModel.updateLastActivityDate();
// Use Peer as default, in case of other types they will set it as soon as possible.
peerType = PeerType.PEER;
if (peersNodeAddress != null)
setPeersNodeAddress(peersNodeAddress);
log.trace("\nNew connection created " + this.toString());
UserThread.execute(() -> connectionListener.onConnection(this));
}
@ -158,9 +152,20 @@ public class Connection implements MessageListener {
if (!stopped) {
try {
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", peersNodeAddress, uid, message);
if (message instanceof DirectMessage && peersNodeAddressOptional.isPresent()) {
setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, message);
} else {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, message);
}
Object objectToWrite;
if (useCompression) {
@ -178,11 +183,11 @@ public class Connection implements MessageListener {
objectOutputStream.writeObject(objectToWrite);
objectOutputStream.flush();
}
sharedSpace.updateLastActivityDate();
sharedModel.updateLastActivityDate();
}
} catch (IOException e) {
// an exception lead to a shutdown
sharedSpace.handleConnectionException(e);
sharedModel.handleConnectionException(e);
}
} else {
log.debug("called sendMessage but was already stopped");
@ -191,7 +196,7 @@ public class Connection implements MessageListener {
public void reportIllegalRequest(IllegalRequest illegalRequest) {
Log.traceCall();
sharedSpace.reportIllegalRequest(illegalRequest);
sharedModel.reportIllegalRequest(illegalRequest);
}
@ -216,25 +221,21 @@ public class Connection implements MessageListener {
this.peerType = peerType;
}
public void setState(State state) {
Log.traceCall(state.toString());
public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) {
Log.traceCall(peerNodeAddress.toString());
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);
if (state == State.SUCCEEDED) {
String peersNodeAddress = getPeersNodeAddressOptional().isPresent() ? getPeersNodeAddressOptional().get().getFullAddress() : "";
String peersNodeAddress = getPeersNodeAddressOptional().isPresent() ? getPeersNodeAddressOptional().get().getFullAddress() : "";
if (this instanceof InboundConnection) {
log.info("\n\n############################################################\n" +
"We are successfully connected to:\n" +
"peerAddress= " + peersNodeAddress +
"We got the peers node address set.\n" +
"peersNodeAddress= " + peersNodeAddress +
"\nuid=" + getUid() +
"\n############################################################\n");
}
this.stateProperty.set(state);
}
public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) {
Log.traceCall();
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);
nodeAddressProperty.set(peerNodeAddress);
}
@ -247,13 +248,17 @@ public class Connection implements MessageListener {
}
public Date getLastActivityDate() {
return sharedSpace.getLastActivityDate();
return sharedModel.getLastActivityDate();
}
public String getUid() {
return uid;
}
public boolean hasPeersNodeAddress() {
return peersNodeAddressOptional.isPresent();
}
public boolean isStopped() {
return stopped;
}
@ -262,16 +267,8 @@ public class Connection implements MessageListener {
return peerType;
}
public Direction getDirection() {
return direction;
}
public ReadOnlyObjectProperty<State> getStateProperty() {
return stateProperty;
}
public State getState() {
return stateProperty.get();
public ReadOnlyObjectProperty<NodeAddress> getNodeAddressProperty() {
return nodeAddressProperty;
}
@ -298,8 +295,8 @@ public class Connection implements MessageListener {
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nlocalPort/port=" + sharedModel.getSocket().getLocalPort()
+ "/" + sharedModel.getSocket().getPort()
+ "\nuid=" + uid
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
@ -330,14 +327,14 @@ public class Connection implements MessageListener {
private void setStopFlags() {
stopped = true;
sharedSpace.stop();
sharedModel.stop();
if (inputHandler != null)
inputHandler.stop();
}
private void doShutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall();
ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason();
ConnectionListener.Reason shutDownReason = sharedModel.getShutDownReason();
if (shutDownReason == null)
shutDownReason = ConnectionListener.Reason.SHUT_DOWN;
final ConnectionListener.Reason finalShutDownReason = shutDownReason;
@ -345,7 +342,7 @@ public class Connection implements MessageListener {
UserThread.execute(() -> connectionListener.onDisconnect(finalShutDownReason, this));
try {
sharedSpace.getSocket().close();
sharedModel.getSocket().close();
} catch (SocketException e) {
log.trace("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) {
@ -389,11 +386,17 @@ public class Connection implements MessageListener {
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", direction=" + direction +
", state=" + getState() +
", uid='" + uid + '\'' +
'}';
}
public String printDetails() {
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", portInfo=" + portInfo +
", uid='" + uid + '\'' +
", sharedSpace=" + sharedSpace.toString() +
", sharedSpace=" + sharedModel.toString() +
", stopped=" + stopped +
", useCompression=" + useCompression +
'}';
@ -408,8 +411,8 @@ public class Connection implements MessageListener {
* Holds all shared data between Connection and InputHandler
* Runs in same thread as Connection
*/
private static class SharedSpace {
private static final Logger log = LoggerFactory.getLogger(SharedSpace.class);
private static class SharedModel {
private static final Logger log = LoggerFactory.getLogger(SharedModel.class);
private final Connection connection;
private final Socket socket;
@ -420,7 +423,7 @@ public class Connection implements MessageListener {
private volatile boolean stopped;
private ConnectionListener.Reason shutDownReason;
public SharedSpace(Connection connection, Socket socket) {
public SharedModel(Connection connection, Socket socket) {
Log.traceCall();
this.connection = connection;
this.socket = socket;
@ -531,7 +534,7 @@ public class Connection implements MessageListener {
private static class InputHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
private final SharedSpace sharedSpace;
private final SharedModel sharedModel;
private final ObjectInputStream objectInputStream;
private final String portInfo;
private final MessageListener messageListener;
@ -539,10 +542,10 @@ public class Connection implements MessageListener {
private volatile boolean stopped;
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) {
this.useCompression = useCompression;
public InputHandler(SharedModel sharedModel, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) {
Log.traceCall();
this.sharedSpace = sharedSpace;
this.useCompression = useCompression;
this.sharedModel = sharedModel;
this.objectInputStream = objectInputStream;
this.portInfo = portInfo;
this.messageListener = messageListener;
@ -560,9 +563,9 @@ public class Connection implements MessageListener {
Thread.currentThread().setName("InputHandler-" + portInfo);
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionInfo());
log.trace("InputHandler waiting for incoming messages connection=" + sharedModel.getConnectionInfo());
Object rawInputObject = objectInputStream.readObject();
log.trace("New data arrived at inputHandler.Connection=" + sharedSpace.getConnectionInfo());
log.trace("New data arrived at inputHandler.Connection=" + sharedModel.getConnectionInfo());
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler.\nReceived object={}"
@ -570,7 +573,7 @@ public class Connection implements MessageListener {
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size > getMaxMsgSize()) {
sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
sharedModel.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
return;
}
@ -582,57 +585,83 @@ public class Connection implements MessageListener {
//log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes);
} else {
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType);
}
} else {
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType);
}
}
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression
if (size > getMaxMsgSize()) {
sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
sharedModel.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
return;
}
if (!(serializable instanceof Message)) {
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
sharedModel.reportIllegalRequest(IllegalRequest.InvalidDataType);
return;
}
Message message = (Message) serializable;
if (message.networkId() != Version.getNetworkId()) {
sharedSpace.reportIllegalRequest(IllegalRequest.WrongNetworkId);
sharedModel.reportIllegalRequest(IllegalRequest.WrongNetworkId);
return;
}
sharedSpace.updateLastActivityDate();
sharedModel.updateLastActivityDate();
Connection connection = sharedModel.connection;
if (message instanceof CloseConnectionMessage) {
log.info("CloseConnectionMessage received on connection {}", sharedSpace.connection);
log.info("CloseConnectionMessage received on connection {}", connection);
stopped = true;
sharedSpace.shutDown(false);
sharedModel.shutDown(false);
} else if (!stopped) {
messageListener.onMessage(message, null);
// First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service
// published, so does not know its address. As the IncomingConnection does not has the
// peersNodeAddress set that connection cannot be used for outgoing messages until we
// get the address set.
// At the data update message (DataRequest using SendersNodeAddressMessage interface)
// after the HS is published we get the peers address set.
// There are only those messages used for new connections to a peer:
// 1. PreliminaryDataRequest
// 2. DataRequest (implements SendersNodeAddressMessage)
// 3. GetPeersRequest (implements SendersNodeAddressMessage)
// 4. DirectMessage (implements SendersNodeAddressMessage)
if (message instanceof SendersNodeAddressMessage) {
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress();
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent())
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
"senderNodeAddress not matching connections peer address");
else
connection.setPeersNodeAddress(senderNodeAddress);
}
if (message instanceof DirectMessage)
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
messageListener.onMessage(message, connection);
}
} catch (IOException | ClassNotFoundException | NoClassDefFoundError e) {
stopped = true;
sharedSpace.handleConnectionException(e);
sharedModel.handleConnectionException(e);
}
}
} catch (Throwable t) {
t.printStackTrace();
stopped = true;
sharedSpace.handleConnectionException(new Exception(t));
sharedModel.handleConnectionException(new Exception(t));
}
}
@Override
public String toString() {
return "InputHandler{" +
"sharedSpace=" + sharedSpace +
"sharedSpace=" + sharedModel +
", port=" + portInfo +
", stopped=" + stopped +
'}';

View File

@ -1,7 +1,6 @@
package io.bitsquare.p2p.network;
public enum IllegalRequest {
// TODO check for needed allowed tolerance
MaxSizeExceeded(1),
InvalidDataType(0),
WrongNetworkId(0);

View File

@ -0,0 +1,9 @@
package io.bitsquare.p2p.network;
import java.net.Socket;
public class InboundConnection extends Connection {
public InboundConnection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
super(socket, messageListener, connectionListener, null);
}
}

View File

@ -93,64 +93,42 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
"We will create a new outbound connection.", peersNodeAddress);
final SettableFuture<Connection> resultFuture = SettableFuture.create();
final boolean[] timeoutOccurred = new boolean[1];
timeoutOccurred[0] = false;
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress);
Connection newConnection = null;
OutboundConnection outboundConnection = null;
try {
// can take a while when using tor
Socket socket = createSocket(peersNodeAddress);
if (timeoutOccurred[0])
throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peersNodeAddress);
newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this, Connection.Direction.OUTBOUND);
newConnection.setPeersNodeAddress(peersNodeAddress);
outBoundConnections.add(newConnection);
outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress);
outBoundConnections.add(outboundConnection);
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\npeerAddress=" + peersNodeAddress
+ "\nuid=" + newConnection.getUid()
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + message
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
// can take a while when using tor
newConnection.sendMessage(message);
return newConnection;
outboundConnection.sendMessage(message);
return outboundConnection;
} catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) {
throwable.printStackTrace();
log.error("Executing task failed. " + throwable.getMessage());
}
if (newConnection != null)
newConnection.setState(Connection.State.FAILED);
throw throwable;
}
});
Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> {
//timer.cancel();
connection.setState(Connection.State.SUCCEEDED);
resultFuture.set(connection);
});
UserThread.execute(() -> resultFuture.set(connection));
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> {
//timer.cancel();
if (lookupInboundConnection(peersNodeAddress).isPresent()) {
lookupInboundConnection(peersNodeAddress).get().setState(Connection.State.FAILED);
} else if (lookupOutboundConnection(peersNodeAddress).isPresent()) {
lookupOutboundConnection(peersNodeAddress).get().setState(Connection.State.FAILED);
}
resultFuture.setException(throwable);
});
UserThread.execute(() -> resultFuture.setException(throwable));
}
});
@ -169,12 +147,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) {
connection.setState(Connection.State.SUCCEEDED);
UserThread.execute(() -> resultFuture.set(connection));
}
public void onFailure(@NotNull Throwable throwable) {
connection.setState(Connection.State.FAILED);
UserThread.execute(() -> resultFuture.setException(throwable));
}
});
@ -189,18 +165,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
return set;
}
public Set<Connection> getSucceededConnections() {
public Set<Connection> getConfirmedConnections() {
// Can contain inbound and outbound connections with the same peer node address,
// as connection hashcode is using uid and port info
return getAllConnections().stream()
.filter(e -> e.getPeersNodeAddressOptional().isPresent())
.filter(e -> e.getState().equals(Connection.State.SUCCEEDED))
.filter(Connection::hasPeersNodeAddress)
.collect(Collectors.toSet());
}
public Set<NodeAddress> getNodeAddressesOfSucceededConnections() {
public Set<NodeAddress> getNodeAddressesOfConfirmedConnections() {
// Does not contain inbound and outbound connection with the same peer node address
return getSucceededConnections().stream()
return getConfirmedConnections().stream()
.map(e -> e.getPeersNodeAddressOptional().get())
.collect(Collectors.toSet());
}

View File

@ -0,0 +1,11 @@
package io.bitsquare.p2p.network;
import io.bitsquare.p2p.NodeAddress;
import java.net.Socket;
public class OutboundConnection extends Connection {
public OutboundConnection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, NodeAddress peersNodeAddress) {
super(socket, messageListener, connectionListener, peersNodeAddress);
}
}

View File

@ -43,7 +43,7 @@ class Server implements Runnable {
final Socket socket = serverSocket.accept();
if (!stopped && !Thread.currentThread().isInterrupted()) {
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
Connection connection = new Connection(socket, messageListener, connectionListener, Connection.Direction.INBOUND);
InboundConnection connection = new InboundConnection(socket, messageListener, connectionListener);
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"Server created new inbound connection:"

View File

@ -0,0 +1,6 @@
package io.bitsquare.p2p.network.messages;
import io.bitsquare.p2p.Message;
public interface AnonymousMessage extends Message {
}

View File

@ -0,0 +1,8 @@
package io.bitsquare.p2p.network.messages;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
public interface SendersNodeAddressMessage extends Message {
NodeAddress getSenderNodeAddress();
}

View File

@ -25,25 +25,19 @@ public class Broadcaster {
public void broadcast(DataBroadcastMessage message, @Nullable NodeAddress sender) {
Log.traceCall("Sender " + sender + ". Message " + message.toString());
Set<NodeAddress> receivers = networkNode.getNodeAddressesOfSucceededConnections();
Set<Connection> receivers = networkNode.getConfirmedConnections();
if (!receivers.isEmpty()) {
log.info("Broadcast message to {} peers. Message: {}", receivers.size(), message);
receivers.stream()
.filter(e -> !e.equals(sender))
.forEach(nodeAddress -> {
log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " + nodeAddress + ".");
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, message);
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> {
log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " +
connection.getPeersNodeAddressOptional().get() + ".");
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + nodeAddress + " succeeded.");
if (connection != null) {
if (!connection.getPeersNodeAddressOptional().isPresent())
connection.setPeersNodeAddress(nodeAddress);
if (connection.getPeerType() == null)
connection.setPeerType(Connection.PeerType.PEER);
}
log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + connection + " succeeded.");
}
@Override

View File

@ -20,12 +20,14 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class PeerExchangeManager implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeManager.class);
@ -33,7 +35,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private final PeerManager peerManager;
private final Set<NodeAddress> seedNodeAddresses;
private final ScheduledThreadPoolExecutor executor;
private Timer requestReportedPeersTimer, checkForSeedNodesTimer;
private Timer requestReportedPeersAfterDelayTimer, timeoutTimer, checkForSeedNodesTimer;
///////////////////////////////////////////////////////////////////////////////////////////
@ -45,23 +47,31 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
this.peerManager = peerManager;
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
networkNode.addMessageListener(this);
executor = Utilities.getScheduledThreadPoolExecutor("PeerExchangeManager", 1, 10, 5);
long delay = new Random().nextInt(60) + 60 * 6; // 6-7 min.
executor.scheduleAtFixedRate(() -> UserThread.execute(() -> {
sendGetPeersRequestToAllConnectedPeers();
checkSeedNodes();
}), delay, delay, TimeUnit.SECONDS);
long delay = new Random().nextInt(60) + 60 * 3; // 3-4 min.
executor.scheduleAtFixedRate(() -> UserThread.execute(this::checkForSeedNode),
delay, delay, TimeUnit.SECONDS);
networkNode.addMessageListener(this);
}
public void shutDown() {
Log.traceCall();
networkNode.removeMessageListener(this);
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
stopRequestReportedPeersTimer();
stopRequestReportedPeersAfterDelayTimer();
stopCheckForSeedNodesTimer();
stopTimeoutTimer();
MoreExecutors.shutdownAndAwaitTermination(executor, 500, TimeUnit.MILLISECONDS);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestReportedPeers(NodeAddress nodeAddress) {
requestReportedPeers(nodeAddress, new ArrayList<>(seedNodeAddresses));
}
@ -75,8 +85,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
@Override
public void onDisconnect(Reason reason, Connection connection) {
// We use a timer to throttle if we get a series of disconnects
// The more connections we have the more relaxed we are with a checkForSeedNode
if (checkForSeedNodesTimer == null)
checkForSeedNodesTimer = UserThread.runAfter(this::checkSeedNodes,
checkForSeedNodesTimer = UserThread.runAfter(this::checkForSeedNode,
networkNode.getAllConnections().size() * 10, TimeUnit.SECONDS);
}
@ -94,64 +106,53 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
if (message instanceof PeerExchangeMessage) {
Log.traceCall(message.toString());
if (message instanceof GetPeersRequest) {
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
HashSet<ReportedPeer> reportedPeers = getPeersRequestMessage.reportedPeers;
log.trace("Received peers: " + reportedPeers);
if (!connection.getPeersNodeAddressOptional().isPresent())
connection.setPeersNodeAddress(getPeersRequestMessage.senderNodeAddress);
HashSet<ReportedPeer> reportedPeers = ((GetPeersRequest) message).reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"The peers address must have been already set at the moment");
SettableFuture<Connection> future = networkNode.sendMessage(connection,
new GetPeersResponse(getReportedPeersHashSet(getPeersRequestMessage.senderNodeAddress)));
new GetPeersResponse(getReportedPeersHashSet(connection.getPeersNodeAddressOptional().get())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersResponse sent successfully");
handleOnSuccess(connection, null);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersResponse sending failed " + throwable.getMessage());
log.info("GetPeersResponse sending failed " + throwable.getMessage() +
" Maybe the peer went offline.");
}
});
peerManager.addToReportedPeers(reportedPeers, connection);
} else if (message instanceof GetPeersResponse) {
stopTimeoutTimer();
HashSet<ReportedPeer> reportedPeers = ((GetPeersResponse) message).reportedPeers;
log.trace("Received peers: " + reportedPeers);
log.trace("Received reported peers: " + reportedPeers);
peerManager.addToReportedPeers(reportedPeers, connection);
if (!peerManager.hasSufficientConnections()) {
List<NodeAddress> remainingNodeAddresses = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers());
networkNode.getNodeAddressesOfSucceededConnections().stream().forEach(remainingNodeAddresses::remove);
if (!remainingNodeAddresses.isEmpty()) {
NodeAddress nodeAddress = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nodeAddress);
requestPeersFromReportedPeers(nodeAddress, remainingNodeAddresses);
}
}
continueWithMorePeers();
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void getReportedPeersFromFirstSeedNode(NodeAddress nodeAddress) {
getReportedPeersFromSeedNode(nodeAddress, new ArrayList<>(seedNodeAddresses));
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void getReportedPeersFromSeedNode(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress);
stopRequestReportedPeersTimer();
stopCheckForSeedNodesTimer();
private void requestReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at getReportedPeersFromSeedNode");
stopRequestReportedPeersAfterDelayTimer();
stopTimeoutTimer();
timeoutTimer = UserThread.runAfter(() -> {
log.info("timeoutTimer called");
handleError(nodeAddress, remainingNodeAddresses);
},
10, TimeUnit.SECONDS);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
@ -159,97 +160,99 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersRequest sent successfully");
handleOnSuccess(connection, nodeAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersRequest sending failed " + throwable.getMessage());
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are more seed nodes available for requesting peers. " +
"We will try getReportedPeersFromFirstSeedNode again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
getReportedPeersFromSeedNode(nextCandidate, remainingNodeAddresses);
} else {
log.info("There is no seed node available for requesting peers. " +
"That is expected if no seed node is online.\n" +
"We will try again to request peers from a seed node after a random pause.");
requestReportedPeersTimer = UserThread.runAfterRandomDelay(() -> {
if (!seedNodeAddresses.isEmpty()) {
List<NodeAddress> nodeAddresses = new ArrayList<>(seedNodeAddresses);
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
nodeAddresses.remove(nextCandidate);
getReportedPeersFromSeedNode(nextCandidate, nodeAddresses);
}
},
30, 40, TimeUnit.SECONDS);
}
log.info("Sending GetPeersRequest to " + nodeAddress + " failed. " +
"That is expected if the peer is offline. " +
"Exception:" + throwable.getMessage());
handleError(nodeAddress, remainingNodeAddresses);
}
});
}
private void handleOnSuccess(Connection connection, @Nullable NodeAddress nodeAddress) {
if (connection != null) {
if (!connection.getPeersNodeAddressOptional().isPresent() && nodeAddress != null)
connection.setPeersNodeAddress(nodeAddress);
if (connection.getPeerType() == null)
connection.setPeerType(peerManager.isSeedNode(connection) ? Connection.PeerType.SEED_NODE : Connection.PeerType.PEER);
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer();
//peerManager.removePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
requestReportedPeersFromList(remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting peers. " +
"That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request peers from our seed nodes after a random pause.");
requestReportedPeersAfterDelayTimer = UserThread.runAfter(() ->
continueWithMorePeers(),
10, TimeUnit.SECONDS);
}
}
private void requestPeersFromReportedPeers(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress);
stopRequestReportedPeersTimer();
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("sendGetPeersRequest sent successfully");
handleOnSuccess(connection, nodeAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
peerManager.removePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are more reported peers available for requesting peers. " +
"We will try getReportedPeersFromFirstSeedNode again.");
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestPeersFromReportedPeers(nextCandidate, remainingNodeAddresses);
} else {
log.info("There are no more reported peers available for requesting peers. " +
"We will try again to request peers from the reported peers again after a random pause.");
requestReportedPeersTimer = UserThread.runAfterRandomDelay(() -> {
List<NodeAddress> nodeAddresses = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers());
if (!nodeAddresses.isEmpty()) {
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
nodeAddresses.remove(nextCandidate);
requestPeersFromReportedPeers(nextCandidate, nodeAddresses);
}
},
30, 40, TimeUnit.SECONDS);
}
}
});
private void requestReportedPeersFromList(List<NodeAddress> remainingNodeAddresses) {
NodeAddress nextCandidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
remainingNodeAddresses.remove(nextCandidate);
requestReportedPeers(nextCandidate, remainingNodeAddresses);
}
private void stopRequestReportedPeersTimer() {
if (requestReportedPeersTimer != null) {
requestReportedPeersTimer.cancel();
requestReportedPeersTimer = null;
private void continueWithMorePeers() {
Log.traceCall();
if (!peerManager.hasSufficientConnections()) {
// We want to keep it sorted but avoid duplicates
List<NodeAddress> list = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers().stream()
.filter(e -> !networkNode.getNodeAddressesOfConfirmedConnections().contains(e))
.collect(Collectors.toSet()));
list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream()
.filter(e -> !list.contains(e) &&
!networkNode.getNodeAddressesOfConfirmedConnections().contains(e))
.collect(Collectors.toSet()));
list.addAll(seedNodeAddresses.stream()
.filter(e -> !list.contains(e) &&
!networkNode.getNodeAddressesOfConfirmedConnections().contains(e) &&
!e.equals(networkNode.getNodeAddress()))
.collect(Collectors.toSet()));
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestReportedPeers(nextCandidate, list);
} else {
log.info("No more peers are available for requestReportedPeers.");
}
} else {
log.info("We have already sufficient connections.");
}
}
// we check if we have at least one seed node connected
private void checkForSeedNode() {
Log.traceCall();
stopCheckForSeedNodesTimer();
Set<Connection> allConnections = networkNode.getConfirmedConnections();
List<Connection> seedNodes = allConnections.stream()
.filter(peerManager::isSeedNode)
.collect(Collectors.toList());
if (seedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) {
requestReportedPeersFromList(new ArrayList<>(seedNodeAddresses));
}
}
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
.filter(e -> !peerManager.isSeedNode(e) &&
!e.nodeAddress.equals(networkNode.getNodeAddress()) &&
!e.nodeAddress.equals(receiverNodeAddress)
)
.collect(Collectors.toSet()));
}
private void stopRequestReportedPeersAfterDelayTimer() {
if (requestReportedPeersAfterDelayTimer != null) {
requestReportedPeersAfterDelayTimer.cancel();
requestReportedPeersAfterDelayTimer = null;
}
}
@ -260,58 +263,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
}
}
private void sendGetPeersRequestToAllConnectedPeers() {
// copy set to avoid issues with changes in set (we dont need to be perfectly in sync so no need to use a concurrent set)
Set<NodeAddress> connectedPeers = new HashSet<>(networkNode.getNodeAddressesOfSucceededConnections());
if (!connectedPeers.isEmpty()) {
Log.traceCall("connectedPeers.size=" + connectedPeers.size());
connectedPeers.stream().forEach(nodeAddress ->
UserThread.runAfterRandomDelay(() ->
sendGetPeersRequest(nodeAddress), 3, 6));
}
}
private void sendGetPeersRequest(NodeAddress nodeAddress) {
Log.traceCall("nodeAddress=" + nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress,
new GetPeersRequest(networkNode.getNodeAddress(), getReportedPeersHashSet(nodeAddress)));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("sendGetPeersRequest sent successfully");
handleOnSuccess(connection, nodeAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
peerManager.removePeer(nodeAddress);
}
});
}
private HashSet<ReportedPeer> getReportedPeersHashSet(@Nullable NodeAddress receiverNodeAddress) {
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
.filter(e -> !peerManager.isSeedNode(e) &&
!e.nodeAddress.equals(networkNode.getNodeAddress()) &&
!e.nodeAddress.equals(receiverNodeAddress)
)
.collect(Collectors.toSet()));
}
private void checkSeedNodes() {
Log.traceCall();
Set<Connection> allConnections = networkNode.getAllConnections();
List<Connection> seedNodes = allConnections.stream()
.filter(peerManager::isSeedNode)
.collect(Collectors.toList());
if (seedNodes.size() == 0 && !seedNodeAddresses.isEmpty()) {
List<NodeAddress> nodeAddresses = new ArrayList<>(seedNodeAddresses);
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
nodeAddresses.remove(nextCandidate);
getReportedPeersFromSeedNode(nextCandidate, nodeAddresses);
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.cancel();
timeoutTimer = null;
}
}
}

View File

@ -2,10 +2,10 @@ package io.bitsquare.p2p.peers;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.data.DataRequest;
import io.bitsquare.storage.Storage;
import javafx.beans.value.ChangeListener;
import org.jetbrains.annotations.Nullable;
@ -16,11 +16,10 @@ import java.io.File;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
public class PeerManager implements ConnectionListener {
public class PeerManager implements ConnectionListener, MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
@ -55,7 +54,7 @@ public class PeerManager implements ConnectionListener {
private final HashSet<ReportedPeer> persistedPeers = new HashSet<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
private Timer checkMaxConnectionsTimer;
private final ChangeListener<Connection.State> stateChangeListener;
private final ChangeListener<NodeAddress> connectionNodeAddressListener;
///////////////////////////////////////////////////////////////////////////////////////////
@ -68,9 +67,9 @@ public class PeerManager implements ConnectionListener {
networkNode.addConnectionListener(this);
createDbStorage(storageDir);
stateChangeListener = (observable, oldValue, newValue) -> {
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
printConnectedPeers();
if (checkMaxConnectionsTimer == null && newValue == Connection.State.SUCCEEDED)
if (checkMaxConnectionsTimer == null && newValue != null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> checkMaxConnections(MAX_CONNECTIONS), 3);
};
}
@ -105,19 +104,46 @@ public class PeerManager implements ConnectionListener {
@Override
public void onConnection(Connection connection) {
connection.getStateProperty().addListener(stateChangeListener);
connection.getNodeAddressProperty().addListener(connectionNodeAddressListener);
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
// OutboundConnection always know their peers address
// A seed node get only InboundConnection from other seed nodes with getData or peer exchange,
// never direct messages, so we need to check for PeerType.SEED_NODE at onMessage
if (connection instanceof OutboundConnection &&
peersNodeAddressOptional.isPresent() &&
seedNodeAddresses.contains(peersNodeAddressOptional.get())) {
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
connection.getStateProperty().removeListener(stateChangeListener);
connection.getPeersNodeAddressOptional().ifPresent(this::removePeer);
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
//connection.getPeersNodeAddressOptional().ifPresent(this::removePeer);
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onMessage(Message message, Connection connection) {
// In case a seed node connects to another seed node we get his address at the DataRequest triggered from
// RequestDataManager.updateDataFromConnectedSeedNode
if (message instanceof DataRequest) {
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent() &&
seedNodeAddresses.contains(peersNodeAddressOptional.get()))
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Check seed node connections
///////////////////////////////////////////////////////////////////////////////////////////
@ -125,7 +151,6 @@ public class PeerManager implements ConnectionListener {
protected boolean checkMaxConnections(int limit) {
Log.traceCall();
stopCheckMaxConnectionsTimer();
cleanupFailedConnections();
removeSuperfluousSeedNodes();
Set<Connection> allConnections = networkNode.getAllConnections();
int size = allConnections.size();
@ -133,10 +158,9 @@ public class PeerManager implements ConnectionListener {
log.info("We have {} connections open. Our limit is {}", size, limit);
log.info("Lets try to remove the inbound connections of type PEER.");
// Only outbound, SUCCEEDED, and PEER type connections
// Only InboundConnection, and PEER type connections
List<Connection> candidates = allConnections.stream()
.filter(e -> e.getState() == Connection.State.SUCCEEDED)
.filter(e -> e.getDirection() == Connection.Direction.INBOUND)
.filter(e -> e instanceof InboundConnection)
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
@ -145,10 +169,9 @@ public class PeerManager implements ConnectionListener {
"MAX_CONNECTIONS_NORMAL_PRIORITY limit of {}", MAX_CONNECTIONS_EXTENDED_1);
if (size > MAX_CONNECTIONS_EXTENDED_1) {
log.info("Lets try to remove any connection of type PEER.");
// All expect SUCCEEDED and DIRECT_MSG_PEER type connections
// Only PEER type connections
candidates = allConnections.stream()
.filter(e -> e.getState() == Connection.State.SUCCEEDED)
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());
if (candidates.size() == 0) {
log.info("No candidates found. We go to the next level and check if we exceed our " +
@ -182,16 +205,6 @@ public class PeerManager implements ConnectionListener {
}
}
protected void cleanupFailedConnections() {
// We close any failed but still open connection (check if that can happen at all)
Stream<Connection> failedConnections = networkNode.getAllConnections().stream()
.filter(e -> e.getState() == Connection.State.FAILED);
failedConnections.findAny().ifPresent(e -> {
log.warn("There is a connection with a failed state. That should not happen. We close it.");
e.shutDown(this::cleanupFailedConnections);
});
}
protected void removeSuperfluousSeedNodes() {
Set<Connection> allConnections = networkNode.getAllConnections();
if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) {
@ -229,13 +242,16 @@ public class PeerManager implements ConnectionListener {
}
public Set<NodeAddress> getNodeAddressesOfReportedPeers() {
return reportedPeers.stream().map(e -> e.nodeAddress).collect(Collectors.toSet());
return reportedPeers.stream().map(e -> e.nodeAddress)
.filter(e -> !isSeedNode(e) &&
!e.equals(networkNode.getNodeAddress()))
.collect(Collectors.toSet());
}
public void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) {
Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd);
// we disconnect misbehaving nodes trying to send too many peers
// reported peers include the authenticated peers which is normally max. 8 but we give some headroom
// reported peers include the connected peers which is normally max. 10 but we give some headroom
// for safety
if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + PeerManager.MIN_CONNECTIONS * 3)) {
connection.shutDown();
@ -269,13 +285,7 @@ public class PeerManager implements ConnectionListener {
purgeReportedPeersIfExceeds();
// We add all adjustedReportedPeers to persistedReportedPeers but only save the 500 peers with the most
// recent lastActivityDate.
// ReportedPeers is changing when peers authenticate (remove) so we don't use that but
// the persistedReportedPeers set.
persistedPeers.addAll(reportedPeersToAdd);
// We add also our authenticated and authenticating peers
persistedPeers.addAll(new HashSet<>(getConnectedPeers()));
// We remove if we exceeds MAX_PERSISTED_PEERS limit
@ -329,13 +339,20 @@ public class PeerManager implements ConnectionListener {
return persistedPeers;
}
public Set<NodeAddress> getNodeAddressesOfPersistedPeers() {
return persistedPeers.stream().map(e -> e.nodeAddress)
.filter(e -> !isSeedNode(e) &&
!e.equals(networkNode.getNodeAddress()))
.collect(Collectors.toSet());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Misc
///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() {
return networkNode.getNodeAddressesOfSucceededConnections().size() >= MIN_CONNECTIONS;
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
}
public void removePeer(NodeAddress nodeAddress) {
@ -350,9 +367,9 @@ public class PeerManager implements ConnectionListener {
}
public Set<ReportedPeer> getConnectedPeers() {
// networkNode.getSucceededConnections includes:
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getSucceededConnections().stream()
return networkNode.getConfirmedConnections().stream()
.map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate()))
.collect(Collectors.toSet());
}
@ -366,7 +383,7 @@ public class PeerManager implements ConnectionListener {
}
public boolean isSeedNode(Connection connection) {
return connection.getPeersNodeAddressOptional().isPresent() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
}
@ -407,10 +424,10 @@ public class PeerManager implements ConnectionListener {
}
private void printConnectedPeers() {
if (!networkNode.getNodeAddressesOfSucceededConnections().isEmpty()) {
if (!networkNode.getNodeAddressesOfConfirmedConnections().isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Connected peers for node " + networkNode.getNodeAddress() + ":");
networkNode.getNodeAddressesOfSucceededConnections().stream().forEach(e -> result.append("\n").append(e));
networkNode.getNodeAddressesOfConfirmedConnections().stream().forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
@ -425,6 +442,4 @@ public class PeerManager implements ConnectionListener {
log.info(result.toString());
}
}
}

View File

@ -12,8 +12,8 @@ import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.data.DataRequest;
import io.bitsquare.p2p.peers.messages.data.DataResponse;
import io.bitsquare.p2p.peers.messages.data.PreliminaryDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.data.ProtectedData;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -28,34 +28,35 @@ import static com.google.common.base.Preconditions.checkArgument;
public class RequestDataManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onNoSeedNodeAvailable();
void onPreliminaryDataReceived();
void onNoPeersAvailable();
void onUpdatedDataReceived();
void onDataReceived();
void onPreliminaryDataReceived();
void onNoPeersAvailable();
void onDataUpdate();
void onNoSeedNodeAvailable();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
private final NetworkNode networkNode;
protected final P2PDataStorage dataStorage;
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final HashSet<ReportedPeer> persistedPeers = new HashSet<>();
private final HashSet<ReportedPeer> remainingPersistedPeers = new HashSet<>();
private Listener listener;
private Optional<NodeAddress> seedNodeOfPreliminaryDataRequest = Optional.empty();
private final Collection<NodeAddress> seedNodeAddresses;
private Timer requestDataFromSeedNodesTimer, requestDataFromPersistedPeersTimer, dataRequestTimeoutTimer;
private boolean noSeedNodeAvailableListenerNotified;
private final Listener listener;
private Optional<NodeAddress> nodeOfPreliminaryDataRequest = Optional.empty();
private Timer requestDataAfterDelayTimer, timeoutTimer;
private boolean dataUpdateRequested;
@ -63,11 +64,13 @@ public class RequestDataManager implements MessageListener {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Set<NodeAddress> seedNodeAddresses) {
public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager,
Set<NodeAddress> seedNodeAddresses, Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
this.peerManager = peerManager;
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
this.listener = listener;
networkNode.addMessageListener(this);
}
@ -77,9 +80,8 @@ public class RequestDataManager implements MessageListener {
networkNode.removeMessageListener(this);
stopRequestDataFromSeedNodesTimer();
stopRequestDataFromPersistedPeersTimer();
stopDataRequestTimeoutTimer();
stopRequestDataTimer();
stopTimeoutTimer();
}
@ -87,23 +89,24 @@ public class RequestDataManager implements MessageListener {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void setRequestDataManagerListener(Listener listener) {
this.listener = listener;
}
public void requestPreliminaryData() {
requestDataFromPeers(seedNodeAddresses);
}
public void updateDataFromConnectedSeedNode() {
Log.traceCall();
checkArgument(seedNodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present");
dataUpdateRequested = true;
requestDataFromPeer(seedNodeOfPreliminaryDataRequest.get(), seedNodeAddresses);
checkArgument(!seedNodeAddresses.isEmpty(), "seedNodeAddresses must not be empty.");
requestDataFromList(new ArrayList<>(seedNodeAddresses));
}
public Optional<NodeAddress> getSeedNodeOfPreliminaryDataRequest() {
return seedNodeOfPreliminaryDataRequest;
public void requestUpdatesData() {
Log.traceCall();
checkArgument(nodeOfPreliminaryDataRequest.isPresent(), "seedNodeOfPreliminaryDataRequest must be present");
dataUpdateRequested = true;
List<NodeAddress> remainingNodeAddresses = new ArrayList<>(seedNodeAddresses);
NodeAddress candidate = nodeOfPreliminaryDataRequest.get();
remainingNodeAddresses.remove(candidate);
requestData(candidate, remainingNodeAddresses);
}
public Optional<NodeAddress> getNodeOfPreliminaryDataRequest() {
return nodeOfPreliminaryDataRequest;
}
@ -113,39 +116,26 @@ public class RequestDataManager implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (message instanceof DataRequest) {
// We are a seed node and receive that msg from a new node
if (message instanceof PreliminaryDataRequest || message instanceof DataRequest) {
Log.traceCall(message.toString());
DataRequest dataRequest = (DataRequest) message;
if (peersNodeAddressOptional.isPresent()) {
checkArgument(peersNodeAddressOptional.get().equals(dataRequest.senderNodeAddress),
"Sender address in message not matching the peers address in our connection.");
} else if (dataRequest.senderNodeAddress != null) {
// If first data request the peer does not has its address
// in case of requesting from first seed node after hidden service is published we did not knew the peers address
connection.setPeersNodeAddress(dataRequest.senderNodeAddress);
}
networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values())));
} else if (message instanceof DataResponse) {
// We are the new node which has requested the data
Log.traceCall(message.toString());
DataResponse dataResponse = (DataResponse) message;
HashSet<ProtectedData> set = dataResponse.set;
// we keep that connection open as the bootstrapping peer will use that later for a re-sync
// as the hidden service is not published yet the data adding will not be broadcasted to others
peersNodeAddressOptional.ifPresent(peersNodeAddress -> set.stream().forEach(e -> dataStorage.add(e, peersNodeAddress)));
stopTimeoutTimer();
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> {
((DataResponse) message).dataSet.stream()
.forEach(e -> dataStorage.add(e, peersNodeAddress));
stopDataRequestTimeoutTimer();
connection.getPeersNodeAddressOptional().ifPresent(e -> {
if (!seedNodeOfPreliminaryDataRequest.isPresent()) {
seedNodeOfPreliminaryDataRequest = Optional.of(e);
// 1. We get a response from requestPreliminaryData
if (!nodeOfPreliminaryDataRequest.isPresent()) {
nodeOfPreliminaryDataRequest = Optional.of(peersNodeAddress);
listener.onPreliminaryDataReceived();
}
// 2. Later we get a response from requestUpdatesData
if (dataUpdateRequested) {
dataUpdateRequested = false;
listener.onDataUpdate();
listener.onUpdatedDataReceived();
}
listener.onDataReceived();
@ -158,149 +148,110 @@ public class RequestDataManager implements MessageListener {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void requestDataFromPeers(Collection<NodeAddress> nodeAddresses) {
Log.traceCall(nodeAddresses.toString());
checkArgument(!nodeAddresses.isEmpty(), "requestDataFromPeers: nodeAddresses must not be empty.");
stopRequestDataFromSeedNodesTimer();
List<NodeAddress> remainingNodeAddresses = new ArrayList<>(nodeAddresses);
NodeAddress candidate = remainingNodeAddresses.get(new Random().nextInt(remainingNodeAddresses.size()));
requestDataFromPeer(candidate, remainingNodeAddresses);
private void requestDataFromList(List<NodeAddress> nodeAddresses) {
Log.traceCall("remainingNodeAddresses=" + nodeAddresses);
NodeAddress nextCandidate = nodeAddresses.get(new Random().nextInt(nodeAddresses.size()));
nodeAddresses.remove(nextCandidate);
requestData(nextCandidate, nodeAddresses);
}
private void requestDataFromPeer(NodeAddress nodeAddress, Collection<NodeAddress> remainingNodeAddresses) {
Log.traceCall(nodeAddress.toString());
remainingNodeAddresses.remove(nodeAddress);
log.info("We try to send a DataRequest request to node. " + nodeAddress);
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
log.info("We try to send a DataRequest request to peer. " + nodeAddress);
stopDataRequestTimeoutTimer();
dataRequestTimeoutTimer = UserThread.runAfter(() -> {
log.info("firstDataRequestTimeoutTimer called");
if (!remainingNodeAddresses.isEmpty()) {
requestDataFromPeers(remainingNodeAddresses);
} else {
requestDataFromPersistedPeersAfterDelay(nodeAddress);
requestDataFromSeedNodesAfterDelay();
}
stopTimeoutTimer();
stopRequestDataTimer();
timeoutTimer = UserThread.runAfter(() -> {
log.info("timeoutTimer called");
handleError(nodeAddress, remainingNodeAddresses);
},
10, TimeUnit.SECONDS);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, new DataRequest(networkNode.getNodeAddress()));
Message dataRequest;
if (networkNode.getNodeAddress() == null)
dataRequest = new PreliminaryDataRequest();
else
dataRequest = new DataRequest(networkNode.getNodeAddress());
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, dataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("Send DataRequest to " + nodeAddress + " succeeded.");
if (connection != null) {
if (!connection.getPeersNodeAddressOptional().isPresent())
connection.setPeersNodeAddress(nodeAddress);
if (connection.getPeerType() == null)
connection.setPeerType(peerManager.isSeedNode(connection) ? Connection.PeerType.SEED_NODE : Connection.PeerType.PEER);
}
log.trace("Send DataRequest to " + nodeAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send DataRequest to " + nodeAddress + " failed. " +
"That is expected if the node is offline. " +
"That is expected if the peer is offline. " +
"Exception:" + throwable.getMessage());
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are more seed nodes available for requesting data. " +
"We will try requestData again.");
ReportedPeer reportedPeer = new ReportedPeer(nodeAddress);
if (remainingPersistedPeers.contains(reportedPeer))
remainingPersistedPeers.remove(reportedPeer);
requestDataFromPeers(remainingNodeAddresses);
} else {
log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is online.\n" +
"We will try again to request data from a seed node after a random pause.");
requestDataFromPersistedPeersAfterDelay(nodeAddress);
requestDataFromSeedNodesAfterDelay();
}
handleError(nodeAddress, remainingNodeAddresses);
}
});
}
private void requestDataFromSeedNodesAfterDelay() {
Log.traceCall();
// We only want to notify the first time
if (!noSeedNodeAvailableListenerNotified) {
noSeedNodeAvailableListenerNotified = true;
listener.onNoSeedNodeAvailable();
}
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer();
//peerManager.removePeer(nodeAddress);
if (requestDataFromSeedNodesTimer == null)
requestDataFromSeedNodesTimer = UserThread.runAfterRandomDelay(() -> requestDataFromPeers(seedNodeAddresses),
10, 20, TimeUnit.SECONDS);
}
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");
requestDataFromList(remainingNodeAddresses);
} else {
log.info("There is no remaining node available for requesting data. " +
"That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request data from our seed nodes after a random pause.");
private void requestDataFromPersistedPeersAfterDelay(@Nullable NodeAddress failedPeer) {
Log.traceCall("failedPeer=" + failedPeer);
if (peerManager.isSeedNode(nodeAddress))
listener.onNoSeedNodeAvailable();
else
listener.onNoPeersAvailable();
stopRequestDataFromPersistedPeersTimer();
if (persistedPeers.isEmpty()) {
persistedPeers.addAll(peerManager.getPersistedPeers());
log.info("persistedPeers = " + persistedPeers);
remainingPersistedPeers.addAll(persistedPeers);
}
if (failedPeer != null) {
ReportedPeer reportedPeer = new ReportedPeer(failedPeer);
if (remainingPersistedPeers.contains(reportedPeer))
remainingPersistedPeers.remove(reportedPeer);
}
boolean persistedPeersAvailable = false;
if (!remainingPersistedPeers.isEmpty()) {
Set<NodeAddress> persistedPeerNodeAddresses = remainingPersistedPeers.stream().map(e -> e.nodeAddress).collect(Collectors.toSet());
if (!persistedPeerNodeAddresses.isEmpty()) {
log.info("We try to use persisted peers for requestData.");
persistedPeersAvailable = true;
requestDataFromPeers(persistedPeerNodeAddresses);
}
}
if (!persistedPeersAvailable) {
log.warn("No seed nodes and no persisted peers are available for requesting data.\n" +
"We will try again after a random pause.");
noSeedNodeAvailableListenerNotified = true;
listener.onNoPeersAvailable();
// reset remainingPersistedPeers
remainingPersistedPeers.clear();
remainingPersistedPeers.addAll(persistedPeers);
if (!remainingPersistedPeers.isEmpty() && requestDataFromPersistedPeersTimer == null)
requestDataFromPersistedPeersTimer = UserThread.runAfterRandomDelay(() ->
requestDataFromPersistedPeersAfterDelay(null),
30, 40, TimeUnit.SECONDS);
requestDataAfterDelayTimer = UserThread.runAfterRandomDelay(() -> {
log.trace("requestDataAfterDelayTimer called");
if (!seedNodeAddresses.isEmpty()) {
Set<NodeAddress> nodeAddressesOfConfirmedConnections = networkNode.getNodeAddressesOfConfirmedConnections();
// We want to keep it sorted but avoid duplicates
// We don't filter out already established connections for seed nodes as it might be that
// we got from the other seed node contacted but we still have not requested the initial
// data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
list.addAll(peerManager.getNodeAddressesOfReportedPeers().stream()
.filter(e -> !list.contains(e))
.collect(Collectors.toSet()));
list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream()
.filter(e -> !list.contains(e))
.collect(Collectors.toSet()));
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate);
requestData(nextCandidate, list);
} else {
log.info("Neither seed nodes, reported peers nor persisted peers are available. " +
"At least seed nodes should be always available.");
}
}
},
10, 15, TimeUnit.SECONDS);
}
}
private void stopRequestDataFromSeedNodesTimer() {
if (requestDataFromSeedNodesTimer != null) {
requestDataFromSeedNodesTimer.cancel();
requestDataFromSeedNodesTimer = null;
private void stopRequestDataTimer() {
if (requestDataAfterDelayTimer != null) {
requestDataAfterDelayTimer.cancel();
requestDataAfterDelayTimer = null;
}
}
private void stopRequestDataFromPersistedPeersTimer() {
if (requestDataFromPersistedPeersTimer != null) {
requestDataFromPersistedPeersTimer.cancel();
requestDataFromPersistedPeersTimer = null;
}
}
private void stopDataRequestTimeoutTimer() {
if (dataRequestTimeoutTimer != null) {
dataRequestTimeoutTimer.cancel();
dataRequestTimeoutTimer = null;
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.cancel();
timeoutTimer = null;
}
}
}

View File

@ -1,23 +1,25 @@
package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import javax.annotation.Nullable;
public final class DataRequest implements Message {
public final class DataRequest implements SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
@Nullable
public final NodeAddress senderNodeAddress;
private final NodeAddress senderNodeAddress;
public DataRequest(@Nullable NodeAddress senderNodeAddress) {
public DataRequest(NodeAddress senderNodeAddress) {
this.senderNodeAddress = senderNodeAddress;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public int networkId() {
return networkId;
@ -25,9 +27,10 @@ public final class DataRequest implements Message {
@Override
public String toString() {
return "GetDataRequest{" +
return "DataRequest{" +
"senderNodeAddress=" + senderNodeAddress +
", networkId=" + networkId +
'}';
}
}

View File

@ -11,10 +11,10 @@ public final class DataResponse implements Message {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
public final HashSet<ProtectedData> set;
public final HashSet<ProtectedData> dataSet;
public DataResponse(HashSet<ProtectedData> set) {
this.set = set;
public DataResponse(HashSet<ProtectedData> dataSet) {
this.dataSet = dataSet;
}
@Override
@ -29,20 +29,20 @@ public final class DataResponse implements Message {
DataResponse that = (DataResponse) o;
return !(set != null ? !set.equals(that.set) : that.set != null);
return !(dataSet != null ? !dataSet.equals(that.dataSet) : that.dataSet != null);
}
@Override
public int hashCode() {
return set != null ? set.hashCode() : 0;
return dataSet != null ? dataSet.hashCode() : 0;
}
@Override
public String toString() {
return "GetDataResponse{" +
return "DataResponse{" +
"networkId=" + networkId +
", set=" + set +
", dataSet=" + dataSet +
'}';
}
}

View File

@ -0,0 +1,26 @@
package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.network.messages.AnonymousMessage;
public final class PreliminaryDataRequest implements AnonymousMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId();
public PreliminaryDataRequest() {
}
@Override
public int networkId() {
return networkId;
}
@Override
public String toString() {
return "PreliminaryDataRequest{" +
"networkId=" + networkId +
'}';
}
}

View File

@ -2,15 +2,16 @@ package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
public final class GetPeersRequest extends PeerExchangeMessage {
public final class GetPeersRequest extends PeerExchangeMessage implements SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final NodeAddress senderNodeAddress;
private NodeAddress senderNodeAddress;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(NodeAddress senderNodeAddress, HashSet<ReportedPeer> reportedPeers) {
@ -18,11 +19,17 @@ public final class GetPeersRequest extends PeerExchangeMessage {
this.reportedPeers = reportedPeers;
}
@Override
public NodeAddress getSenderNodeAddress() {
return senderNodeAddress;
}
@Override
public String toString() {
return "GetPeersRequest{" +
"senderAddress=" + senderNodeAddress +
"senderNodeAddress=" + senderNodeAddress +
", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
}

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.storage.data;
import io.bitsquare.app.Version;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.crypto.DirectMessage;
import java.security.PublicKey;
@ -11,12 +11,12 @@ public final class ExpirableMailboxPayload implements ExpirablePayload {
private static final long TTL = 10 * 24 * 60 * 60 * 1000; // 10 days
public final SealedAndSignedMessage sealedAndSignedMessage;
public final DirectMessage directMessage;
public final PublicKey senderStoragePublicKey;
public final PublicKey receiverStoragePublicKey;
public ExpirableMailboxPayload(SealedAndSignedMessage sealedAndSignedMessage, PublicKey senderStoragePublicKey, PublicKey receiverStoragePublicKey) {
this.sealedAndSignedMessage = sealedAndSignedMessage;
public ExpirableMailboxPayload(DirectMessage directMessage, PublicKey senderStoragePublicKey, PublicKey receiverStoragePublicKey) {
this.directMessage = directMessage;
this.senderStoragePublicKey = senderStoragePublicKey;
this.receiverStoragePublicKey = receiverStoragePublicKey;
}
@ -33,20 +33,20 @@ public final class ExpirableMailboxPayload implements ExpirablePayload {
ExpirableMailboxPayload that = (ExpirableMailboxPayload) o;
return !(sealedAndSignedMessage != null ? !sealedAndSignedMessage.equals(that.sealedAndSignedMessage) : that.sealedAndSignedMessage != null);
return !(directMessage != null ? !directMessage.equals(that.directMessage) : that.directMessage != null);
}
@Override
public int hashCode() {
return sealedAndSignedMessage != null ? sealedAndSignedMessage.hashCode() : 0;
return directMessage != null ? directMessage.hashCode() : 0;
}
@Override
public String toString() {
return "MailboxEntry{" +
"hashCode=" + hashCode() +
", sealedAndSignedMessage=" + sealedAndSignedMessage +
", sealedAndSignedMessage=" + directMessage +
'}';
}
}

View File

@ -72,7 +72,9 @@ public class EncryptionServiceTests {
public void testDecryptAndVerifyMessage() throws CryptoException {
EncryptionService encryptionService = new EncryptionService(keyRing);
TestMessage data = new TestMessage("test");
SealedAndSignedMessage encrypted = new SealedAndSignedMessage(encryptionService.encryptAndSign(pubKeyRing, data), Hash.getHash("aa"));
DirectMessage encrypted = new DirectMessage(null,
encryptionService.encryptAndSign(pubKeyRing, data),
Hash.getHash("aa"));
DecryptedMsgWithPubKey decrypted = encryptionService.decryptAndVerify(encrypted.sealedAndSigned);
assertEquals(data.data, ((TestMessage) decrypted.message).data);
}

View File

@ -1,8 +1,8 @@
package io.bitsquare.p2p;
import io.bitsquare.common.crypto.*;
import io.bitsquare.crypto.DirectMessage;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey;
import io.bitsquare.p2p.messaging.MailboxMessage;
import io.bitsquare.p2p.messaging.SendMailboxMessageListener;
@ -287,10 +287,10 @@ public class P2PServiceTest {
MockMailboxMessage mockMessage = new MockMailboxMessage("MockMailboxMessage", p2PService2.getAddress());
p2PService2.getNetworkNode().addMessageListener((message, connection) -> {
log.trace("message " + message);
if (message instanceof SealedAndSignedMessage) {
if (message instanceof DirectMessage) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService2.decryptAndVerify(sealedAndSignedMessage.sealedAndSigned);
DirectMessage directMessage = (DirectMessage) message;
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService2.decryptAndVerify(directMessage.sealedAndSigned);
Assert.assertEquals(mockMessage, decryptedMsgWithPubKey.message);
Assert.assertEquals(p2PService2.getAddress(), ((MailboxMessage) decryptedMsgWithPubKey.message).getSenderNodeAddress());
latch2.countDown();

View File

@ -3,8 +3,8 @@ package io.bitsquare.p2p.storage;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.*;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.crypto.DirectMessage;
import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.TestUtils;
import io.bitsquare.p2p.mocks.MockMessage;
@ -222,8 +222,10 @@ public class ProtectedDataStorageTest {
KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
// sender
MockMessage mockMessage = new MockMessage("MockMessage");
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage), Hash.getHash("aa"));
ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(sealedAndSignedMessage,
DirectMessage directMessage = new DirectMessage(networkNode1.getNodeAddress(),
encryptionService1.encryptAndSign(keyRing1.getPubKeyRing(), mockMessage),
Hash.getHash("aa"));
ExpirableMailboxPayload expirableMailboxPayload = new ExpirableMailboxPayload(directMessage,
keyRing1.getSignatureKeyPair().getPublic(),
keyRing2.getSignatureKeyPair().getPublic());