Fix authentication issues, store address

This commit is contained in:
Manfred Karrer 2015-11-12 02:00:04 +01:00
parent f8adaaed62
commit 85427d7bde
22 changed files with 553 additions and 598 deletions

View file

@ -78,6 +78,15 @@ public class Storage<T extends Serializable> {
this.dir = dir;
}
@Nullable
public T initAndGetPersisted(String fileName) {
this.fileName = fileName;
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS);
return getPersisted();
}
@Nullable
public T initAndGetPersisted(T serializable) {
return initAndGetPersisted(serializable, serializable.getClass().getSimpleName());
@ -90,15 +99,23 @@ public class Storage<T extends Serializable> {
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS);
return getPersisted(serializable);
return getPersisted();
}
public void queueUpForSave() {
queueUpForSave(serializable);
}
// Save delayed and on a background thread
public void queueUpForSave() {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
public void queueUpForSave(T serializable) {
if (serializable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
fileManager.saveLater(serializable);
fileManager.saveLater(serializable);
} else {
log.trace("queueUpForSave called but no serializable set");
}
}
public void remove(String fileName) {
@ -113,17 +130,17 @@ public class Storage<T extends Serializable> {
// We do the file read on the UI thread to avoid problems from multi threading.
// Data are small and read is done only at startup, so it is no performance issue.
@Nullable
private T getPersisted(T serializable) {
private T getPersisted() {
if (storageFile.exists()) {
long now = System.currentTimeMillis();
try {
T persistedObject = fileManager.read(storageFile);
log.trace("Read {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
log.trace("Read {} completed in {}msec", storageFile, System.currentTimeMillis() - now);
// If we did not get any exception we can be sure the data are consistent so we make a backup
now = System.currentTimeMillis();
fileManager.backupFile(fileName);
log.trace("Backup {} completed in {}msec", serializable.getClass().getSimpleName(), System.currentTimeMillis() - now);
log.trace("Backup {} completed in {}msec", storageFile, System.currentTimeMillis() - now);
return persistedObject;
} catch (ClassCastException | IOException e) {

View file

@ -44,6 +44,7 @@ import org.reactfx.util.FxTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.inject.Named;
import java.io.File;
import java.time.Duration;
@ -176,7 +177,7 @@ public class OpenOfferManager {
shutDown(null);
}
public void shutDown(Runnable completeHandler) {
public void shutDown(@Nullable Runnable completeHandler) {
if (timer != null)
timer.cancel();
@ -188,7 +189,8 @@ public class OpenOfferManager {
offerBookService.removeOfferAtShutDown(openOffer.getOffer());
}
FxTimer.runLater(Duration.ofMillis(500), completeHandler::run);
if (completeHandler != null)
FxTimer.runLater(Duration.ofMillis(500), completeHandler::run);
}
}

View file

@ -183,21 +183,23 @@ class MainViewModel implements ViewModel {
private BooleanProperty initP2PNetwork() {
final BooleanProperty initialDataReady = new SimpleBooleanProperty();
splashP2PNetworkInfo.set("Connecting to Tor network...");
p2PService.start(new P2PServiceListener() {
@Override
public void onTorNodeReady() {
splashP2PNetworkInfo.set("Publishing Tor Hidden Service...");
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
p2PNetworkIconId.set("image-connection-tor");
}
@Override
public void onHiddenServicePublished() {
splashP2PNetworkInfo.set("Authenticating to a seed node...");
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
}
@Override
public void onRequestingDataCompleted() {
p2PNetworkInfo.set("Publishing Tor Hidden Service...");
initialDataReady.set(true);
}
@ -340,14 +342,14 @@ class MainViewModel implements ViewModel {
// update nr of peers in footer
p2PService.numAuthenticatedPeers.addListener((observable, oldValue, newValue) -> updateP2pNetworkInfo());
p2PService.getNumAuthenticatedPeers().addListener((observable, oldValue, newValue) -> updateP2pNetworkInfo());
// now show app
showAppScreen.set(true);
}
private void updateP2pNetworkInfo() {
p2PNetworkInfo.set("Nr. of authenticated connections: " + p2PService.numAuthenticatedPeers.get());
p2PNetworkInfo.set("Nr. of authenticated connections: " + p2PService.getNumAuthenticatedPeers().get());
}
private void displayAlertIfPresent(Alert alert) {

View file

@ -17,6 +17,7 @@
package io.bitsquare.gui.main.market;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.gui.common.view.ActivatableViewAndModel;
import io.bitsquare.gui.common.view.FxmlView;
@ -134,7 +135,8 @@ public class MarketView extends ActivatableViewAndModel<TabPane, MarketViewModel
newValue -> {
String code = newValue.getCode();
areaChart.setTitle("Offer book for " + newValue.getName());
xAxis.setTickLabelFormatter(new NumberAxis.DefaultFormatter(xAxis, "", " " + code + "/BTC"));
xAxis.setLabel(priceColumnLabel.get());
xAxis.setTickLabelFormatter(new NumberAxis.DefaultFormatter(xAxis, "", ""));
priceColumnLabel.set("Price (" + code + "/BTC)");
volumeColumnLabel.set("Volume (" + code + ")");
});
@ -156,7 +158,7 @@ public class MarketView extends ActivatableViewAndModel<TabPane, MarketViewModel
TableView<Offer> tableView = new TableView();
// price
TableColumn<Offer, Offer> priceColumn = new TableColumn<>("Price (EUR/BTC)");
TableColumn<Offer, Offer> priceColumn = new TableColumn<>();
priceColumn.textProperty().bind(priceColumnLabel);
priceColumn.setMinWidth(120);
priceColumn.setCellValueFactory((offer) -> new ReadOnlyObjectWrapper<>(offer.getValue()));
@ -237,7 +239,7 @@ public class MarketView extends ActivatableViewAndModel<TabPane, MarketViewModel
Label titleLabel = new Label(direction.equals(Offer.Direction.BUY) ? "Offers for buy bitcoin (bid)" : "Offers for sell bitcoin (ask)");
titleLabel.setStyle("-fx-font-weight: bold; -fx-font-size: 16; -fx-alignment: center");
titleLabel.prefWidthProperty().bind(tableView.widthProperty());
UserThread.execute(() -> titleLabel.prefWidthProperty().bind(tableView.widthProperty()));
VBox vBox = new VBox();
vBox.setSpacing(10);
@ -252,13 +254,13 @@ public class MarketView extends ActivatableViewAndModel<TabPane, MarketViewModel
xAxis = new NumberAxis();
xAxis.setForceZeroInRange(false);
xAxis.setAutoRanging(true);
xAxis.setLabel("Price");
xAxis.setLabel(priceColumnLabel.get());
yAxis = new NumberAxis();
yAxis.setForceZeroInRange(false);
yAxis.setAutoRanging(true);
yAxis.setLabel("Amount");
yAxis.setTickLabelFormatter(new NumberAxis.DefaultFormatter(yAxis, "", " BTC"));
yAxis.setLabel("Amount in BTC");
yAxis.setTickLabelFormatter(new NumberAxis.DefaultFormatter(yAxis, "", ""));
seriesBuy = new XYChart.Series();
seriesBuy.setName("Offers for buy bitcoin ");

View file

@ -127,7 +127,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
}
numAuthenticatedPeersChangeListener = (observable, oldValue, newValue) -> updateAuthenticatedPeersTextArea();
p2PService.numAuthenticatedPeers.addListener(numAuthenticatedPeersChangeListener);
p2PService.getNumAuthenticatedPeers().addListener(numAuthenticatedPeersChangeListener);
updateAuthenticatedPeersTextArea();
}
@ -136,7 +136,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
if (p2PServiceListener != null)
p2PService.removeP2PServiceListener(p2PServiceListener);
if (numAuthenticatedPeersChangeListener != null)
p2PService.numAuthenticatedPeers.removeListener(numAuthenticatedPeersChangeListener);
p2PService.getNumAuthenticatedPeers().removeListener(numAuthenticatedPeersChangeListener);
}
private void updateAuthenticatedPeersTextArea() {

View file

@ -33,7 +33,7 @@
<logger name="org.bitcoinj.core.BitcoinSerializer" level="WARN"/>
<logger name="org.bitcoinj.core.Peer" level="WARN"/>
<logger name="org.bitcoinj.core.HeadersMessage" level="WARN"/>
<logger name="org.bitcoinj.core.AbstractBlockChain" level="WARN"/>
<logger name="org.bitcoinj.core.AbstractBlockChain" level="ERROR"/>
<!--
<logger name="io.netty" level="OFF"/>

View file

@ -13,11 +13,11 @@ public final class SealedAndSignedMessage implements MailboxMessage {
private final int networkId = Version.NETWORK_ID;
public final SealedAndSigned sealedAndSigned;
public final byte[] blurredAddressHash;
public final byte[] addressPrefixHash;
public SealedAndSignedMessage(SealedAndSigned sealedAndSigned, byte[] blurredAddressHash) {
public SealedAndSignedMessage(SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) {
this.sealedAndSigned = sealedAndSigned;
this.blurredAddressHash = blurredAddressHash;
this.addressPrefixHash = addressPrefixHash;
}
@Override
@ -35,7 +35,7 @@ public final class SealedAndSignedMessage implements MailboxMessage {
return "SealedAndSignedMessage{" +
"networkId=" + networkId +
", sealedAndSigned=" + sealedAndSigned +
", receiverAddressMaskHash.hashCode()=" + Arrays.toString(blurredAddressHash).hashCode() +
", receiverAddressMaskHash.hashCode()=" + Arrays.toString(addressPrefixHash).hashCode() +
'}';
}
}

View file

@ -8,7 +8,7 @@ import java.util.regex.Pattern;
public class Address implements Serializable {
public final String hostName;
public final int port;
transient private byte[] blurredAddress;
transient private byte[] addressPrefixHash;
public Address(String hostName, int port) {
this.hostName = hostName;
@ -26,10 +26,10 @@ public class Address implements Serializable {
}
// We use just a few chars form or address to blur the potential receiver for sent messages
public byte[] getBlurredAddress() {
if (blurredAddress == null)
blurredAddress = Hash.getHash(getFullAddress().substring(0, 2));
return blurredAddress;
public byte[] getAddressPrefixHash() {
if (addressPrefixHash == null)
addressPrefixHash = Hash.getHash(getFullAddress().substring(0, 2));
return addressPrefixHash;
}
@Override

View file

@ -16,9 +16,7 @@ import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.Peer;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.peers.PeerListener;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
@ -26,13 +24,10 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ExpirablePayload;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.DataExchangeMessage;
import io.bitsquare.p2p.storage.messages.GetDataRequest;
import io.bitsquare.p2p.storage.messages.GetDataResponse;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleIntegerProperty;
import io.bitsquare.storage.Storage;
import javafx.beans.property.*;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;
import org.jetbrains.annotations.NotNull;
@ -52,11 +47,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* Represents our node in the P2P network
*/
public class P2PService implements SetupListener, MessageListener, ConnectionListener, PeerListener {
public class P2PService implements SetupListener, MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
private static final int RETRY_GET_DATA = 10 * 1000;
private final SeedNodesRepository seedNodesRepository;
private final int port;
private final File torDir;
@ -64,26 +57,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Nullable
private final EncryptionService encryptionService;
private final KeyRing keyRing;
private final File storageDir;
// set in init
private NetworkNode networkNode;
private PeerGroup peerGroup;
private ProtectedExpirableDataStorage dataStorage;
private final CopyOnWriteArraySet<DecryptedMailListener> decryptedMailListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArraySet<>();
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new HashMap<>();
private volatile boolean shutDownInProgress;
private Address connectedSeedNode;
private final Set<Address> authenticatedPeerAddresses = new HashSet<>();
private boolean shutDownComplete;
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
private final BooleanProperty authenticated = new SimpleBooleanProperty();
private MonadicBinding<Boolean> readyForAuthentication;
public final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
private Address connectedSeedNode;
private volatile boolean shutDownInProgress;
private boolean shutDownComplete;
private MonadicBinding<Boolean> readyForAuthentication;
private final Storage<Address> dbStorage;
private Address myOnionAddress;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -105,40 +102,42 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.useLocalhost = useLocalhost;
this.encryptionService = encryptionService;
this.keyRing = keyRing;
this.storageDir = storageDir;
dbStorage = new Storage<>(storageDir);
init(networkId);
init(networkId, storageDir);
}
private void init(int networkId) {
private void init(int networkId, File storageDir) {
Log.traceCall();
Address persisted = dbStorage.initAndGetPersisted("myOnionAddress");
if (persisted != null)
this.myOnionAddress = persisted;
// network
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
Set<Address> seedNodeAddresses = seedNodesRepository.geSeedNodeAddresses(useLocalhost, networkId);
// peer group
peerGroup = new PeerGroup(networkNode, seedNodeAddresses);
if (useLocalhost) PeerGroup.setSimulateAuthTorNode(2 * 100);
if (useLocalhost)
PeerGroup.setSimulateAuthTorNode(400);
// storage
// P2P network storage
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
networkNode.addConnectionListener(this);
networkNode.addMessageListener(this);
peerGroup.addPeerListener(this);
dataStorage.addHashMapChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
Log.traceCall();
if (entry instanceof ProtectedMailboxData)
tryDecryptMailboxData((ProtectedMailboxData) entry);
processProtectedMailboxData((ProtectedMailboxData) entry);
}
@Override
public void onRemoved(ProtectedData entry) {
Log.traceCall();
}
});
@ -148,7 +147,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// we need to have both the initial data delivered and the hidden service published before we
// bootstrap and authenticate to other nodes.
if (newValue)
tryAuthenticateSeedNode();
authenticateSeedNode();
});
requestingDataCompleted.addListener((observable, oldValue, newValue) -> {
@ -173,21 +172,22 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
setRequestingDataCompleted();
} else if (message instanceof DataExchangeMessage) {
Log.traceCall(message.toString());
DataExchangeMessage dataExchangeMessage = (DataExchangeMessage) message;
HashSet<ProtectedData> set = dataExchangeMessage.set;
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
setRequestingDataCompleted();
onRequestingDataComplete();
} else if (message instanceof SealedAndSignedMessage) {
Log.traceCall(message.toString());
// Seed nodes don't have set the encryptionService
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
if (verifyBlurredAddressHash(sealedAndSignedMessage)) {
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
// We set connectionType to that connection to avoid that is get closed when
// we get too many connection attempts.
// That is used as protection against eclipse attacks.
connection.setConnectionType(ConnectionsType.DIRECT_MSG);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()));
@ -202,12 +202,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private boolean verifyBlurredAddressHash(SealedAndSignedMessage sealedAndSignedMessage) {
byte[] blurredAddressHash = getAddress().getBlurredAddress();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, sealedAndSignedMessage.blurredAddressHash);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
@ -223,9 +217,12 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
authenticated.set(true);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
if (!authenticated.get()) {
authenticated.set(true);
sendGetDataRequestAfterAuthentication(peerAddress, connection);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
}
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@ -244,31 +241,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onFirstAuthenticatePeer(Peer peer) {
Log.traceCall();
log.trace("onFirstAuthenticatePeer " + peer);
sendGetAllDataMessageAfterAuthentication(peer);
}
@Override
public void onPeerAdded(Peer peer) {
}
@Override
public void onPeerRemoved(Address address) {
}
@Override
public void onConnectionAuthenticated(Connection connection) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// SetupListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@ -283,10 +255,60 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
sendGetDataRequest(peerGroup.getSeedNodeAddresses());
}
private void sendGetDataRequest(Collection<Address> seedNodeAddresses) {
Log.traceCall(seedNodeAddresses.toString());
if (!seedNodeAddresses.isEmpty()) {
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingSeedNodeAddresses);
Address candidate = remainingSeedNodeAddresses.remove(0);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
checkArgument(connectedSeedNode == null, "We have already a connectedSeedNode. That should not happen.");
connectedSeedNode = candidate;
// In case we get called from a retry we check if we need to authenticate
if (!authenticated.get() && hiddenServicePublished.get())
authenticateSeedNode();
else
log.debug("No connected seedNode available.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
"That is expected if other seed nodes are offline. " +
"Exception:" + throwable.getMessage());
if (!remainingSeedNodeAddresses.isEmpty())
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
sendGetDataRequest(remainingSeedNodeAddresses);
}
});
} else {
log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is online.\n" +
"We will try again after a bit ");
onRequestingDataComplete();
UserThread.runAfterRandomDelay(() -> sendGetDataRequest(peerGroup.getSeedNodeAddresses()),
20, 30, TimeUnit.SECONDS);
}
}
@Override
public void onHiddenServicePublished() {
Log.traceCall();
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
if (myOnionAddress != null)
checkArgument(networkNode.getAddress() == myOnionAddress, "networkNode.getAddress() must be same as myOnionAddress");
myOnionAddress = networkNode.getAddress();
dbStorage.queueUpForSave(myOnionAddress);
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
@ -300,83 +322,37 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
}
private void sendGetDataRequest(Collection<Address> seedNodeAddresses) {
Log.traceCall();
if (!seedNodeAddresses.isEmpty()) {
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingSeedNodeAddresses);
Address candidate = remainingSeedNodeAddresses.remove(0);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
connectedSeedNode = candidate;
// In case we get called by a retry we check if we need authenticate as well
if (hiddenServicePublished.get() && !authenticated.get()) {
peerGroup.authenticateSeedNode(connectedSeedNode);
} else {
log.debug("No connected seedNode available.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
"That is expected if other seed nodes are offline." +
"\nException:" + throwable.getMessage());
if (!remainingSeedNodeAddresses.isEmpty())
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
sendGetDataRequest(remainingSeedNodeAddresses);
}
});
} else {
log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is available.\n" +
"We will try again after {} ms", RETRY_GET_DATA);
setRequestingDataCompleted();
UserThread.runAfter(() -> sendGetDataRequest(peerGroup.getSeedNodeAddresses()),
RETRY_GET_DATA, TimeUnit.MILLISECONDS);
}
}
private void setRequestingDataCompleted() {
private void onRequestingDataComplete() {
Log.traceCall();
// 2. (or 3.) Step: We got all data loaded (or no seed node available - should not happen in real operation)
requestingDataCompleted.set(true);
}
// 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node.
private void tryAuthenticateSeedNode() {
private void authenticateSeedNode() {
Log.traceCall();
if (connectedSeedNode != null) {
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
if (connectedSeedNode != null)
peerGroup.authenticateSeedNode(connectedSeedNode);
} else {
log.debug("No connected seedNode available.");
}
}
// 5. Step:
private void sendGetAllDataMessageAfterAuthentication(final Peer peer) {
Log.traceCall(peer.toString());
private void sendGetDataRequestAfterAuthentication(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString());
// We have to exchange the data again as we might have missed pushed data in the meantime
// After authentication we send our data set to the other peer.
// As he will do the same we will get his actual data set.
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new DataExchangeMessage(getDataSet()));
SettableFuture<Connection> future = networkNode.sendMessage(connection, new GetDataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " succeeded.");
log.trace("sendGetDataRequestAfterAuthentication: Send GetDataRequest to " + peerAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("sendGetAllDataMessageAfterAuthentication Send DataExchangeMessage to " + peer.address + " failed. " +
//TODO how to deal with that case?
log.warn("sendGetDataRequestAfterAuthentication: Send GetDataRequest to " + peerAddress + " failed. " +
"Exception:" + throwable.getMessage());
}
});
@ -429,13 +405,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
shutDownCompleteHandler.run();
else
shutDownResultHandlers.add(shutDownCompleteHandler);
log.warn("shutDown already in progress");
log.debug("shutDown already in progress");
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Messaging
// MailMessages
///////////////////////////////////////////////////////////////////////////////////////////
public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
@ -445,7 +421,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkAuthentication();
if (!authenticatedPeerAddresses.contains(peerAddress))
peerGroup.authenticateToPeer(peerAddress,
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
() -> sendMailMessageListener.onFault());
else
@ -458,7 +434,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getBlurredAddress());
encryptionService.encryptAndSign(pubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -479,6 +455,47 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// MailboxMessages
///////////////////////////////////////////////////////////////////////////////////////////
private void processProtectedMailboxData(ProtectedMailboxData mailboxData) {
if (encryptionService != null) {
Log.traceCall();
ExpirablePayload expirablePayload = mailboxData.expirablePayload;
if (expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) expirablePayload;
SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage;
if (verifyAddressPrefixHash(sealedAndSignedMessage)) {
try {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
if (decryptedMsgWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message;
Address senderAddress = mailboxMessage.getSenderAddress();
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
mailboxMap.put(decryptedMsgWithPubKey, mailboxData);
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress="
+ senderAddress + " / my address=" + getAddress());
decryptedMailboxListeners.stream().forEach(
e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress));
} else {
log.warn("tryDecryptMailboxData: Expected MailboxMessage but got other type. " +
"decryptedMsgWithPubKey.message=", decryptedMsgWithPubKey.message);
}
} catch (CryptoException e) {
log.trace("Decryption of SealedAndSignedMessage failed. " +
"That is expected if the message is not intended for us. " + e.getMessage());
}
} else {
log.info("Wrong blurredAddressHash. The message is not intended for us.");
}
}
}
}
public void sendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
@ -489,7 +506,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (authenticatedPeerAddresses.contains(peerAddress)) {
trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener);
} else {
peerGroup.authenticateToPeer(peerAddress,
peerGroup.authenticateToDirectMessagePeer(peerAddress,
() -> trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener),
() -> {
log.info("We cannot authenticate to peer. Peer might be offline. We will store message in mailbox.");
@ -498,13 +515,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
// send message and if it fails (peer offline) we store the data to the network
private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getBlurredAddress());
encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -535,7 +553,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Data storage
///////////////////////////////////////////////////////////////////////////////////////////
@ -545,8 +562,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkAuthentication();
try {
return dataStorage.add(dataStorage.getDataWithSignedSeqNr(expirablePayload,
keyRing.getSignatureKeyPair()), networkNode.getAddress());
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair());
return dataStorage.add(protectedData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -558,8 +575,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkAuthentication();
try {
dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
keyRing.getSignatureKeyPair(),
receiversPublicKey);
dataStorage.add(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
@ -570,8 +590,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
checkAuthentication();
try {
return dataStorage.remove(dataStorage.getDataWithSignedSeqNr(expirablePayload,
keyRing.getSignatureKeyPair()), networkNode.getAddress());
ProtectedData protectedData = dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair());
return dataStorage.remove(protectedData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false;
@ -585,12 +605,22 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (mailboxMap.containsKey(decryptedMsgWithPubKey)) {
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) {
checkArgument(mailboxData.receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
"mailboxData.receiversPubKey is not matching with our key. That must not happen.");
removeMailboxData((ExpirableMailboxPayload) mailboxData.expirablePayload, mailboxData.receiversPubKey);
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully protectedExpirableData.");
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) mailboxData.expirablePayload;
PublicKey receiversPubKey = mailboxData.receiversPubKey;
checkArgument(receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()),
"receiversPubKey is not matching with our key. That must not happen.");
try {
ProtectedMailboxData protectedMailboxData = dataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
keyRing.getSignatureKeyPair(),
receiversPubKey);
dataStorage.removeMailboxData(protectedMailboxData, networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
mailboxMap.remove(decryptedMsgWithPubKey);
log.trace("Removed successfully decryptedMsgWithPubKey.");
}
} else {
log.warn("decryptedMsgWithPubKey not found in mailboxMap. That should never happen." +
@ -598,20 +628,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private void removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkAuthentication();
try {
dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload,
keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
}
public Map<ByteArray, ProtectedData> getDataMap() {
Log.traceCall();
return dataStorage.getMap();
}
@ -621,37 +638,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
public void addDecryptedMailListener(DecryptedMailListener listener) {
Log.traceCall();
decryptedMailListeners.add(listener);
}
public void removeDecryptedMailListener(DecryptedMailListener listener) {
Log.traceCall();
decryptedMailListeners.remove(listener);
}
public void addDecryptedMailboxListener(DecryptedMailboxListener listener) {
Log.traceCall();
decryptedMailboxListeners.add(listener);
}
public void removeDecryptedMailboxListener(DecryptedMailboxListener listener) {
Log.traceCall();
decryptedMailboxListeners.remove(listener);
}
public void addP2PServiceListener(P2PServiceListener listener) {
Log.traceCall();
p2pServiceListeners.add(listener);
}
public void removeP2PServiceListener(P2PServiceListener listener) {
Log.traceCall();
p2pServiceListeners.remove(listener);
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
Log.traceCall();
dataStorage.addHashMapChangedListener(hashMapChangedListener);
}
@ -680,49 +690,27 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return authenticatedPeerAddresses;
}
public ReadOnlyIntegerProperty getNumAuthenticatedPeers() {
return numAuthenticatedPeers;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private HashSet<ProtectedData> getDataSet() {
// Log.traceCall();
return new HashSet<>(dataStorage.getMap().values());
return new HashSet<>(getDataMap().values());
}
private void tryDecryptMailboxData(ProtectedMailboxData mailboxData) {
Log.traceCall();
if (encryptionService != null) {
ExpirablePayload expirablePayload = mailboxData.expirablePayload;
if (expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) expirablePayload;
SealedAndSignedMessage sealedAndSignedMessage = expirableMailboxPayload.sealedAndSignedMessage;
if (verifyBlurredAddressHash(sealedAndSignedMessage)) {
try {
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
if (decryptedMsgWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMsgWithPubKey.message;
Address senderAddress = mailboxMessage.getSenderAddress();
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
mailboxMap.put(decryptedMsgWithPubKey, mailboxData);
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress="
+ senderAddress + " / my address=" + getAddress());
decryptedMailboxListeners.stream().forEach(
e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress));
} else {
log.warn("tryDecryptMailboxData: Expected MailboxMessage but got other type. " +
"decryptedMsgWithPubKey.message=", decryptedMsgWithPubKey.message);
}
} catch (CryptoException e) {
log.trace("Decryption of SealedAndSignedMessage failed. " +
"That is expected if the message is not intended for us. " + e.getMessage());
}
} else {
log.info("Wrong blurredAddressHash. The message is not intended for us.");
}
}
private boolean verifyAddressPrefixHash(SealedAndSignedMessage sealedAndSignedMessage) {
if (myOnionAddress != null) {
byte[] blurredAddressHash = myOnionAddress.getAddressPrefixHash();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, sealedAndSignedMessage.addressPrefixHash);
} else {
log.warn("myOnionAddress must not be null at verifyAddressPrefixHash");
return false;
}
}

View file

@ -31,6 +31,7 @@ public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
private ConnectionsType connectionType;
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;
@ -71,7 +72,7 @@ public class Connection implements MessageListener {
this.connectionListener = connectionListener;
sharedSpace = new SharedSpace(this, socket);
Log.traceCall();
if (socket.getLocalPort() == 0)
portInfo = "port=" + socket.getPort();
@ -122,6 +123,10 @@ public class Connection implements MessageListener {
connectionListener.onPeerAddressAuthenticated(peerAddress, connection);
}
public void setConnectionType(ConnectionsType connectionType) {
this.connectionType = connectionType;
}
// Called form various threads
public void sendMessage(Message message) {
Log.traceCall();
@ -205,6 +210,9 @@ public class Connection implements MessageListener {
return stopped;
}
public ConnectionsType getConnectionType() {
return connectionType;
}
///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown
@ -324,6 +332,7 @@ public class Connection implements MessageListener {
", isAuthenticated=" + isAuthenticated +
", stopped=" + stopped +
", stopped=" + stopped +
", connectionType=" + connectionType +
", useCompression=" + useCompression +
'}';
}

View file

@ -0,0 +1,8 @@
package io.bitsquare.p2p.network;
public enum ConnectionsType {
PASSIVE, // for connections initiated by other peer
ACTIVE, // for connections initiated by us
DIRECT_MSG, // for connections used for direct messaging
AUTH_REQUEST // for connections used for starting the authentication
}

View file

@ -27,8 +27,8 @@ import java.util.function.Consumer;
public class LocalhostNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
private static volatile int simulateTorDelayTorNode = 2 * 100;
private static volatile int simulateTorDelayHiddenService = 2 * 100;
private static volatile int simulateTorDelayTorNode = 600;
private static volatile int simulateTorDelayHiddenService = 3000;
private Address address;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {

View file

@ -87,17 +87,20 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
if (connection != null) {
return sendMessage(connection, message);
} else {
log.debug("inBoundConnections " + inBoundConnections.toString());
log.debug("outBoundConnections " + outBoundConnections.toString());
log.trace("We have not found any connection for that peerAddress. " +
"We will create a new outbound connection.");
final SettableFuture<Connection> resultFuture = SettableFuture.create();
final boolean[] timeoutOccurred = new boolean[1];
timeoutOccurred[0] = false;
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress);
try {
Socket socket = createSocket(peerAddress); // can take a while when using tor
if (timeoutOccurred[0])
throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress);
Connection newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this);
newConnection.setPeerAddress(peerAddress);
outBoundConnections.add(newConnection);
@ -125,10 +128,11 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void run() {
Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000));
timeoutOccurred[0] = true;
future.cancel(true);
String message = "Timeout occurred when tried to create Socket to peer: " + peerAddress;
log.info(message);
resultFuture.setException(new TimeoutException(message));
UserThread.execute(() -> resultFuture.setException(new TimeoutException(message)));
}
}, CREATE_SOCKET_TIMEOUT);
@ -335,15 +339,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
Log.traceCall(peerAddress.toString());
log.debug("outBoundConnections " + outBoundConnections);
Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
private Optional<Connection> lookupInboundConnection(Address peerAddress) {
Log.traceCall(peerAddress.toString());
log.debug("inBoundConnections " + inBoundConnections);
Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}

View file

@ -4,20 +4,23 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionsType;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.auth.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
// authentication example:
@ -51,8 +54,6 @@ public class AuthenticationHandshake implements MessageListener {
this.networkNode = networkNode;
this.peerGroup = peerGroup;
this.myAddress = myAddress;
networkNode.addMessageListener(this);
}
@ -62,74 +63,85 @@ public class AuthenticationHandshake implements MessageListener {
@Override
public void onMessage(Message message, Connection connection) {
checkArgument(!stopped);
if (message instanceof AuthenticationMessage) {
Log.traceCall(message.toString());
if (message instanceof AuthenticationResponse) {
// Requesting peer
// We use the active connectionType if we started the authentication request to another peer
// That is used for protecting eclipse attacks
connection.setConnectionType(ConnectionsType.ACTIVE);
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
connection.setPeerAddress(authenticationResponse.address);
Address peerAddress = authenticationResponse.address;
log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress);
connection.setPeerAddress(peerAddress);
log.trace("Received authenticationResponse from " + peerAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
if (verified) {
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthRequest(myAddress, authenticationResponse.responderNonce, new HashSet<>(peerGroup.getAllPeerAddresses())));
GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress,
authenticationResponse.responderNonce,
new HashSet<>(peerGroup.getAllPeerAddresses()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthRequest);
log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersAuthRequest sent successfully from " + myAddress + " to " + peerAddress);
connection.setPeerAddress(peerAddress);
log.trace("Successfully sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersAuthRequest sending failed " + throwable.getMessage());
onFault(throwable);
failed(throwable);
}
});
// We could set already the authenticated flag here already, but as we need the reported peers we need
// to wait for the GetPeersAuthResponse before we are completed.
} else {
log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce));
log.warn("verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonce=" + nonce);
failed(new Exception("Verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonceMap=" + nonce));
}
} else if (message instanceof GetPeersAuthRequest) {
// Responding peer
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
Address peerAddress = getPeersAuthRequest.address;
log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress);
log.trace("GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce;
if (verified) {
// we create the msg with our already collected peer addresses (before adding the new ones)
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses())));
log.trace("sent GetPeersAuthResponse to " + peerAddress + " from " + myAddress
+ " with allPeers=" + peerGroup.getAllPeerAddresses());
GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress,
new HashSet<>(peerGroup.getAllPeerAddresses()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthResponse);
log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
// now we add the reported peers to our own set
HashSet<Address> peerAddresses = getPeersAuthRequest.peerAddresses;
log.trace("Received peers: " + peerAddresses);
log.trace("Received reported peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("GetPeersAuthResponse sent successfully from " + myAddress + " to " + peerAddress);
connection.setPeerAddress(peerAddress);
log.trace("Successfully sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
AuthenticationHandshake.this.onSuccess(connection);
completed(connection);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersAuthResponse sending failed " + throwable.getMessage());
onFault(throwable);
failed(throwable);
}
});
} else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
failed(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
}
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
@ -137,130 +149,97 @@ public class AuthenticationHandshake implements MessageListener {
Address peerAddress = getPeersAuthResponse.address;
log.trace("GetPeersAuthResponse from " + peerAddress + " at " + myAddress);
HashSet<Address> peerAddresses = getPeersAuthResponse.peerAddresses;
log.trace("Received peers: " + peerAddresses);
log.trace("Received reported peers: " + peerAddresses);
peerGroup.addToReportedPeers(peerAddresses, connection);
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
onSuccess(connection);
completed(connection);
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
// Authentication initiated by requesting peer
///////////////////////////////////////////////////////////////////////////////////////////
public SettableFuture<Connection> requestAuthenticationToPeer(Address peerAddress) {
public SettableFuture<Connection> requestAuthentication(Address peerAddress) {
Log.traceCall();
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce()));
init();
AuthenticationRequest authenticationRequest = new AuthenticationRequest(myAddress, getAndSetNonce());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
public void onSuccess(Connection connection) {
log.trace("send AuthenticationRequest to " + peerAddress + " succeeded.");
connection.setPeerAddress(peerAddress);
// We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionType(ConnectionsType.AUTH_REQUEST);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
log.info("Send AuthenticationRequest to " + peerAddress + " failed." +
"\nException:" + throwable.getMessage());
onFault(throwable);
failed(throwable);
}
});
return resultFuture;
}
public SettableFuture<Connection> requestAuthentication(Set<Address> remainingAddresses, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
remainingAddresses.remove(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationRequest(myAddress, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("send RequestAuthenticationMessage to " + peerAddress + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage());
onFault(throwable);
}
});
///////////////////////////////////////////////////////////////////////////////////////////
// Responding to authentication request
///////////////////////////////////////////////////////////////////////////////////////////
return resultFuture;
}
public SettableFuture<Connection> processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) {
public SettableFuture<Connection> respondToAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) {
Log.traceCall();
// Responding peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
init();
Address peerAddress = authenticationRequest.address;
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress);
//TODO check if causes problems without delay
connection.shutDown(() -> {
Log.traceCall();
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
UserThread.runAfter(() -> {
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. AuthenticationRequest from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.requesterNonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("onSuccess sending ChallengeMessage");
}
AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress,
authenticationRequest.requesterNonce,
getAndSetNonce());
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("onSuccess sending AuthenticationResponse");
connection.setPeerAddress(peerAddress);
// We use passive connectionType for connections created from received authentication requests from other peers
// That is used for protecting eclipse attacks
connection.setConnectionType(ConnectionsType.PASSIVE);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending ChallengeMessage.");
onFault(throwable);
}
});
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending AuthenticationResponse.");
failed(throwable);
}
});
}
}, 200, TimeUnit.MILLISECONDS);
});
/* connection.shutDown(() -> UserThread.runAfter(() -> { Log.traceCall();
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) { Log.traceCall();
log.trace("onSuccess sending ChallengeMessage");
}
@Override
public void onFailure(@NotNull Throwable throwable) { Log.traceCall();
log.warn("onFailure sending ChallengeMessage.");
onFault(throwable);
}
});
}
},
100 + PeerGroup.simulateAuthTorNode,
TimeUnit.MILLISECONDS));*/
return resultFuture;
}
@ -269,6 +248,12 @@ public class AuthenticationHandshake implements MessageListener {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void init() {
networkNode.addMessageListener(this);
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
}
private long getAndSetNonce() {
Log.traceCall();
nonce = new Random().nextLong();
@ -278,21 +263,21 @@ public class AuthenticationHandshake implements MessageListener {
return nonce;
}
private void onFault(@NotNull Throwable throwable) {
private void failed(@NotNull Throwable throwable) {
Log.traceCall();
cleanup();
shutDown();
resultFuture.setException(throwable);
}
private void onSuccess(Connection connection) {
private void completed(Connection connection) {
Log.traceCall();
cleanup();
shutDown();
resultFuture.set(connection);
}
private void cleanup() {
private void shutDown() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(this);
stopped = true;
}
}

View file

@ -1,18 +0,0 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
//TODO used only in unittests yet
public abstract class AuthenticationListener implements PeerListener {
public void onFirstAuthenticatePeer(Peer peer) {
}
public void onPeerAdded(Peer peer) {
}
public void onPeerRemoved(Address address) {
}
abstract public void onConnectionAuthenticated(Connection connection);
}

View file

@ -8,10 +8,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
import io.bitsquare.p2p.peers.messages.maintenance.*;
import io.bitsquare.p2p.storage.messages.DataBroadcastMessage;
@ -21,7 +18,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -37,10 +33,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
PeerGroup.simulateAuthTorNode = simulateAuthTorNode;
}
private static int MAX_CONNECTIONS = 8;
private static int MAX_CONNECTIONS_LOW_PRIO = 8;
private static int MAX_CONNECTIONS_NORMAL_PRIO = MAX_CONNECTIONS_LOW_PRIO + 4;
private static int MAX_CONNECTIONS_HIGH_PRIO = MAX_CONNECTIONS_NORMAL_PRIO + 4;
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
public static void setMaxConnectionsLowPrio(int maxConnectionsLowPrio) {
MAX_CONNECTIONS_LOW_PRIO = maxConnectionsLowPrio;
}
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
@ -49,16 +47,14 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private final NetworkNode networkNode;
private final Set<Address> seedNodeAddresses;
private final Set<PeerListener> peerListeners = new HashSet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<Address> reportedPeerAddresses = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new ConcurrentHashMap<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private Timer sendPingTimer = new Timer();
private Timer getPeersTimer = new Timer();
private boolean shutDownInProgress;
private boolean firstPeerAdded = false;
///////////////////////////////////////////////////////////////////////////////////////////
@ -164,16 +160,18 @@ public class PeerGroup implements MessageListener, ConnectionListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to seed node
// Process incoming authentication request
///////////////////////////////////////////////////////////////////////////////////////////
private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) {
Log.traceCall(message.toString());
Address peerAddress = message.address;
if (!authenticationHandshakes.containsKey(peerAddress)) {
// We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionType(ConnectionsType.AUTH_REQUEST);
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest(message, connection);
SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
@ -198,40 +196,48 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to seed node
///////////////////////////////////////////////////////////////////////////////////////////
// After HS is published or after a retry from a successful GetDataRequest if no seed nodes have been available initially
public void authenticateSeedNode(Address peerAddress) {
Log.traceCall();
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true);
}
// First we try to connect to 1 seed node. If we fail we try to connect to any reported peer.
// First we try to connect to 1 seed node.
// If we fail we try to connect to one of the remaining seed nodes.
// If that fails as well we use the reported peers if available.
// If there are also no reported peers we retry after a random pause of a few minutes.
//
// After connection is authenticated, we try to connect to any reported peer as long we have not
// reached our max connection size.
private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean connectToReportedAfterSuccess) {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (continueOnSuccess) {
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers(true);
} else {
log.info("We have already enough connections.");
}
public void onSuccess(Connection connection) {
setAuthenticated(connection, peerAddress);
if (connectToReportedAfterSuccess) {
if (getAuthenticatedPeers().size() < MAX_CONNECTIONS_LOW_PRIO) {
log.info("We still don't have enough connections. Lets try the reported peers.");
authenticateToRemainingReportedPeers(true);
} else {
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
log.info("We have already enough connections.");
}
} else {
log.info("We have already tried all reported peers and seed nodes. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
}
@ -242,12 +248,12 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
// If we fail we try again with the remaining set
// If we fail we try again with the remaining set excluding the failed one
remainingAddresses.remove(peerAddress);
log.trace("We try to authenticate to another random seed nodes of that list: " + remainingAddresses);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(remainingAddresses);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomNotAuthPeerAndRemainingSet(remainingAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
@ -255,8 +261,8 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We don't have any more seed nodes for connecting. Lets try the reported peers.");
authenticateToRemainingReportedPeers(true);
} else {
log.info("We don't have any more seed nodes or reported nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
log.info("We don't have any more seed nodes nor reported nodes for connecting. " +
"We stop authentication attempts now, but will repeat after a few minutes.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
@ -267,13 +273,32 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private void authenticateToRemainingReportedPeers(boolean calledFromSeedNodeMethod) {
private void authenticateToRemainingSeedNodes() {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses);
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomNotAuthPeerAndRemainingSet(seedNodeAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, true);
} else {
log.info("We don't have any more seed nodes for connecting. " +
"We stop authentication attempts now, but will repeat after an while.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(false),
1, 2, TimeUnit.MINUTES);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to reported peers
///////////////////////////////////////////////////////////////////////////////////////////
private void authenticateToRemainingReportedPeers(boolean calledFromAuthenticateToSeedNode) {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomNotAuthPeerAndRemainingSet(reportedPeerAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random peer. " + tupleOptional.get().first);
authenticateToReportedPeer(tupleOptional.get().second, tupleOptional.get().first);
} else if (calledFromSeedNodeMethod) {
authenticateToReportedPeer(tupleOptional.get().first);
} else if (calledFromAuthenticateToSeedNode) {
log.info("We don't have any reported peers for connecting. " +
"As we tried recently the seed nodes we will wait a bit before repeating.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
@ -285,45 +310,42 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
// We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed.
// If we succeed we repeat until we are ut of addresses.
private void authenticateToReportedPeer(Set<Address> remainingAddresses, Address peerAddress) {
// If we succeed we repeat until we are out of addresses.
private void authenticateToReportedPeer(Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers(false);
} else {
log.info("We still don't have enough connections. " +
"Lets wait a bit and then try the remaining seed nodes.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
1, 2, TimeUnit.MINUTES);
}
public void onSuccess(Connection connection) {
setAuthenticated(connection, peerAddress);
if (getAuthenticatedPeers().size() < MAX_CONNECTIONS_LOW_PRIO) {
if (reportedPeerAddresses.size() > 0) {
log.info("We still don't have enough connections. " +
"Lets try the remaining reported peer addresses.");
authenticateToRemainingReportedPeers(false);
} else {
log.info("We have already enough connections.");
log.info("We don't have more reported peers and still don't have enough connections. " +
"Lets wait a bit and then try the remaining seed nodes.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNodes(),
1, 2, TimeUnit.MINUTES);
}
} else {
log.info("We have already enough connections.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
throwable.printStackTrace();
log.info("Send RequestAuthenticationMessage to a reported peer with address " + peerAddress + " failed." +
"\nThat is expected if the nodes was offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
authenticateToRemainingReportedPeers(false);
if (reportedPeerAddresses.size() > 0) {
log.info("Authentication failed. Lets try again with the remaining reported peer addresses.");
authenticateToRemainingReportedPeers(false);
@ -340,40 +362,29 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
}
private void authenticateToRemainingSeedNodes() {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(seedNodeAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first);
authenticateToSeedNode(tupleOptional.get().second, tupleOptional.get().first, false);
} else {
log.info("We don't have any more seed nodes for connecting. " +
"We stop bootstrapping now, but will repeat after an while.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeers(true),
1, 2, TimeUnit.MINUTES);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Authentication to non-seed node peer
// Authentication to peer used for direct messaging
///////////////////////////////////////////////////////////////////////////////////////////
public void authenticateToPeer(Address peerAddress, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
// Priority is set when we receive a decrypted mail message as those are used for direct messages
public void authenticateToDirectMessagePeer(Address peerAddress,
@Nullable Runnable completeHandler,
@Nullable Runnable faultHandler) {
Log.traceCall(peerAddress.getFullAddress());
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that seed node already authenticated. That must never happen.");
if (!authenticationHandshakes.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress());
authenticationHandshakes.put(peerAddress, authenticationHandshake);
SettableFuture<Connection> future = authenticationHandshake.requestAuthenticationToPeer(peerAddress);
SettableFuture<Connection> future = authenticationHandshake.requestAuthentication(peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
setAuthenticated(connection, peerAddress);
if (authenticationCompleteHandler != null)
authenticationCompleteHandler.run();
if (completeHandler != null)
completeHandler.run();
}
}
@ -405,20 +416,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
connection.setAuthenticated(peerAddress, connection);
addAuthenticatedPeer(new Peer(connection));
peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
}
private void addAuthenticatedPeer(Peer peer) {
Log.traceCall(peer.toString());
authenticatedPeers.put(peer.address, peer);
reportedPeerAddresses.remove(peer.address);
firstPeerAdded = !firstPeerAdded && authenticatedPeers.size() == 1;
peerListeners.stream().forEach(e -> e.onPeerAdded(peer));
if (firstPeerAdded)
peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer));
Address peerAddress = peer.address;
authenticatedPeers.put(peerAddress, peer);
reportedPeerAddresses.remove(peerAddress);
if (!checkIfConnectedPeersExceeds())
printAuthenticatedPeers();
@ -451,52 +455,89 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}, 1, 2, TimeUnit.MINUTES);
}
// TODO needs unit tests
private boolean checkIfConnectedPeersExceeds() {
Log.traceCall();
if (authenticatedPeers.size() > MAX_CONNECTIONS) {
log.trace("We have too many connections open. Lets remove the one which was not active recently.");
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
int size = authenticatedPeers.size();
if (size > MAX_CONNECTIONS_LOW_PRIO) {
Set<Connection> allConnections = networkNode.getAllConnections();
log.info("We have {} connections open. Lets remove the passive connections" +
" which have not been active recently.", allConnections.size());
if (size != allConnections.size())
log.warn("authenticatedPeers.size()!=allConnections.size(). There is some inconsistency.");
List<Connection> authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionType() == ConnectionsType.PASSIVE)
.collect(Collectors.toList());
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
Connection connection = authenticatedConnections.remove(0);
log.info("We had shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS));
return true;
if (authenticatedConnections.size() == 0) {
log.debug("There are no passive connections for closing. We check if we are exceeding " +
"MAX_CONNECTIONS_NORMAL ({}) ", MAX_CONNECTIONS_NORMAL_PRIO);
if (size > MAX_CONNECTIONS_NORMAL_PRIO) {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionType() == ConnectionsType.PASSIVE || e.getConnectionType() == ConnectionsType.ACTIVE)
.collect(Collectors.toList());
if (authenticatedConnections.size() == 0) {
log.debug("There are no passive or active connections for closing. We check if we are exceeding " +
"MAX_CONNECTIONS_HIGH ({}) ", MAX_CONNECTIONS_HIGH_PRIO);
if (size > MAX_CONNECTIONS_HIGH_PRIO) {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
.collect(Collectors.toList());
}
}
}
}
if (authenticatedConnections.size() > 0) {
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
Connection connection = authenticatedConnections.remove(0);
log.info("We had shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS));
return true;
} else {
log.warn("That code path should never be reached. (checkIfConnectedPeersExceeds)");
return false;
}
} else {
log.trace("We don't have too many connections open.");
log.trace("We only have {} connections open and don't need to close any.", size);
return false;
}
}
private void pingPeers() {
Log.traceCall();
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PingMessage sent successfully");
}
if (!connectedPeersList.isEmpty()) {
Log.traceCall();
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("PingMessage sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 1, 10));
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 1, 10));
}
}
private void trySendGetPeersRequest() {
Log.traceCall();
Collection<Peer> peers = authenticatedPeers.values();
if (!peers.isEmpty()) {
Log.traceCall();
Set<Peer> connectedPeersList = new HashSet<>(peers);
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
@ -515,8 +556,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
});
}, 5, 10));
} else {
log.info("No peers available for requesting data.");
}
}
@ -576,21 +615,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addPeerListener(PeerListener peerListener) {
Log.traceCall();
peerListeners.add(peerListener);
}
public void removePeerListener(PeerListener peerListener) {
Log.traceCall();
peerListeners.remove(peerListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
@ -642,7 +666,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
int diff = size - MAX_REPORTED_PEERS;
List<Address> list = new LinkedList<>(getReportedNotConnectedPeerAddresses());
for (int i = 0; i < diff; i++) {
Address toRemove = getAndRemoveRandomItem(list);
Address toRemove = getAndRemoveRandomAddress(list);
reportedPeerAddresses.remove(toRemove);
}
} else {
@ -668,12 +692,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
boolean contained = reportedPeerAddresses.remove(peerAddress);
boolean wasInReportedPeers = reportedPeerAddresses.remove(peerAddress);
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedPeer != null)
peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress));
if (contained || disconnectedPeer != null)
if (wasInReportedPeers || disconnectedPeer != null)
printAllPeers();
}
}
@ -688,23 +709,24 @@ public class PeerGroup implements MessageListener, ConnectionListener {
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
private Address getAndRemoveRandomItem(List<Address> list) {
Log.traceCall();
return list.remove(new Random().nextInt(list.size()));
}
private Optional<Tuple2<Address, Set<Address>>> getRandomItemAndRemainingSet(Set<Address> remainingAddresses) {
private Optional<Tuple2<Address, Set<Address>>> getRandomNotAuthPeerAndRemainingSet(Set<Address> remainingAddresses) {
Log.traceCall();
List<Address> list = new ArrayList<>(remainingAddresses);
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
if (!list.isEmpty()) {
Address item = getAndRemoveRandomItem(list);
Address item = getAndRemoveRandomAddress(list);
return Optional.of(new Tuple2<>(item, new HashSet<>(list)));
} else {
return Optional.empty();
}
}
private Address getAndRemoveRandomAddress(List<Address> list) {
Log.traceCall();
return list.remove(new Random().nextInt(list.size()));
}
public void printAllPeers() {
printAuthenticatedPeers();
printReportedPeers();

View file

@ -1,16 +0,0 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
public interface PeerListener {
void onFirstAuthenticatePeer(Peer peer);
// TODO never used
void onPeerAdded(Peer peer);
// TODO never used
void onPeerRemoved(Address address);
void onConnectionAuthenticated(Connection connection);
}

View file

@ -65,10 +65,10 @@ public class SeedNode {
String arg2 = args[2];
int maxConnections = Integer.parseInt(arg2);
checkArgument(maxConnections < 1000, "maxConnections seems to be a bit too high...");
PeerGroup.setMaxConnections(maxConnections);
PeerGroup.setMaxConnectionsLowPrio(maxConnections);
} else {
// we keep default a higher connection size for seed nodes
PeerGroup.setMaxConnections(50);
PeerGroup.setMaxConnectionsLowPrio(50);
}
if (args.length > 3) {
String arg3 = args[3];

View file

@ -65,7 +65,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
private void init() {
Log.traceCall();
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
HashMap<ByteArray, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "SequenceNumberMap");
if (persisted != null) {
sequenceNumberMap = persisted;
}

View file

@ -1,48 +0,0 @@
package io.bitsquare.p2p.storage.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet;
public final class DataExchangeMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID;
public final HashSet<ProtectedData> set;
public DataExchangeMessage(HashSet<ProtectedData> set) {
this.set = set;
}
@Override
public int networkId() {
return networkId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataExchangeMessage)) return false;
DataExchangeMessage that = (DataExchangeMessage) o;
return !(set != null ? !set.equals(that.set) : that.set != null);
}
@Override
public int hashCode() {
return set != null ? set.hashCode() : 0;
}
@Override
public String toString() {
return "GetDataResponse{" +
"networkId=" + networkId +
", set=" + set +
'}';
}
}

View file

@ -59,7 +59,7 @@ public class P2PServiceTest {
LocalhostNetworkNode.setSimulateTorDelayTorNode(10);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(100);
PeerGroup.setMaxConnections(8);
PeerGroup.setMaxConnectionsLowPrio(8);
keyRing1 = new KeyRing(new KeyStorage(dir1));
keyRing2 = new KeyRing(new KeyStorage(dir2));

View file

@ -1,12 +1,9 @@
package io.bitsquare.p2p.routing;
import io.bitsquare.common.util.Profiler;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.p2p.peers.AuthenticationListener;
import io.bitsquare.p2p.peers.PeerGroup;
import io.bitsquare.p2p.seed.SeedNode;
import org.junit.*;
@ -36,7 +33,7 @@ public class PeerGroupTest {
public void setup() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(50);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(8);
PeerGroup.setMaxConnections(100);
PeerGroup.setMaxConnectionsLowPrio(100);
seedNodes = new HashSet<>();
if (useLocalhost) {
@ -213,7 +210,8 @@ public class PeerGroupTest {
// node1 -> node2 PeersMessage
// first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2
CountDownLatch latch1 = new CountDownLatch(2);
//TODO
/* CountDownLatch latch1 = new CountDownLatch(2);
AuthenticationListener routingListener1 = new AuthenticationListener() {
@Override
public void onConnectionAuthenticated(Connection connection) {
@ -274,12 +272,13 @@ public class PeerGroupTest {
seedNode1.shutDown(() -> shutDownLatch.countDown());
seedNode2.shutDown(() -> shutDownLatch.countDown());
seedNode3.shutDown(() -> shutDownLatch.countDown());
shutDownLatch.await();
shutDownLatch.await();*/
}
//@Test
public void testAuthenticationWithDisconnect() throws InterruptedException {
LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
//TODO
/* LocalhostNetworkNode.setSimulateTorDelayTorNode(0);
LocalhostNetworkNode.setSimulateTorDelayHiddenService(0);
SeedNode seedNode1 = getAndStartSeedNode(8001);
SeedNode seedNode2 = getAndStartSeedNode(8002);
@ -340,12 +339,13 @@ public class PeerGroupTest {
CountDownLatch shutDownLatch = new CountDownLatch(2);
seedNode1.shutDown(() -> shutDownLatch.countDown());
seedNode2.shutDown(() -> shutDownLatch.countDown());
shutDownLatch.await();
shutDownLatch.await();*/
}
//@Test
public void testAuthenticationWithManyNodes() throws InterruptedException {
int authentications = 0;
//TODO
/* int authentications = 0;
int length = 3;
SeedNode[] nodes = new SeedNode[length];
for (int i = 0; i < length; i++) {
@ -379,7 +379,7 @@ public class PeerGroupTest {
for (int i = 0; i < length; i++) {
nodes[i].shutDown(() -> shutDownLatch.countDown());
}
shutDownLatch.await();
shutDownLatch.await();*/
}
private SeedNode getAndStartSeedNode(int port) throws InterruptedException {