From e3cdad4299b3a3fc7529440f6677477de92d8e04 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 6 Nov 2015 20:12:46 +0100 Subject: [PATCH] extract authentication to class, map to user thread --- .../java/io/bitsquare/common/UserThread.java | 37 + .../io/bitsquare/common/util/Utilities.java | 33 - .../arbitration/ArbitratorManager.java | 2 +- .../bitsquare/arbitration/DisputeManager.java | 2 +- .../java/io/bitsquare/trade/TradeManager.java | 2 +- .../trade/offer/OpenOfferManager.java | 2 +- .../io/bitsquare/gui/main/MainViewModel.java | 2 +- .../java/io/bitsquare/p2p/P2PService.java | 412 +++++------ .../io/bitsquare/p2p/network/Connection.java | 10 +- .../p2p/network/LocalhostNetworkNode.java | 6 +- .../io/bitsquare/p2p/network/NetworkNode.java | 3 + .../bitsquare/p2p/network/SetupListener.java | 2 +- .../bitsquare/p2p/network/TorNetworkNode.java | 15 +- .../p2p/peer/AuthenticationHandshake.java | 255 +++++++ .../p2p/peer/AuthenticationListener.java | 2 +- .../java/io/bitsquare/p2p/peer/PeerGroup.java | 699 +++++++++--------- .../io/bitsquare/p2p/peer/PeerListener.java | 2 +- .../p2p/peer/messages/GetPeersMessage.java | 6 +- .../p2p/peer/messages/PeersMessage.java | 6 +- .../java/io/bitsquare/p2p/seed/SeedNode.java | 44 +- ...ataSetMessage.java => AllDataMessage.java} | 8 +- ...SetMessage.java => GetAllDataMessage.java} | 7 +- .../test/java/io/bitsquare/p2p/TestUtils.java | 6 +- .../p2p/network/LocalhostNetworkNodeTest.java | 4 +- .../p2p/network/TorNetworkNodeTest.java | 8 +- .../bitsquare/p2p/routing/PeerGroupTest.java | 25 +- .../io/bitsquare/p2p/seed/SeedNodeMain.java | 9 +- 27 files changed, 914 insertions(+), 695 deletions(-) create mode 100644 network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java rename network/src/main/java/io/bitsquare/p2p/storage/messages/{DataSetMessage.java => AllDataMessage.java} (79%) rename network/src/main/java/io/bitsquare/p2p/storage/messages/{GetDataSetMessage.java => GetAllDataMessage.java} (64%) diff --git a/common/src/main/java/io/bitsquare/common/UserThread.java b/common/src/main/java/io/bitsquare/common/UserThread.java index 7ce9965833..8666629111 100644 --- a/common/src/main/java/io/bitsquare/common/UserThread.java +++ b/common/src/main/java/io/bitsquare/common/UserThread.java @@ -18,10 +18,17 @@ package io.bitsquare.common; import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class UserThread { + private static final Logger log = LoggerFactory.getLogger(UserThread.class); public static Executor getExecutor() { return executor; @@ -41,4 +48,34 @@ public class UserThread { public static void execute(Runnable command) { UserThread.executor.execute(command); } + + + public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) { + return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS); + } + + public static Timer runAfterRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) { + return UserThread.runAfter(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit); + } + + public static Timer runAfter(Runnable runnable, long delayInSec) { + return UserThread.runAfter(runnable, delayInSec, TimeUnit.SECONDS); + } + + public static Timer runAfter(Runnable runnable, long delay, TimeUnit timeUnit) { + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000)); + try { + UserThread.execute(() -> runnable.run()); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing timerTask failed. " + t.getMessage()); + } + } + }, timeUnit.toMillis(delay)); + return timer; + } } diff --git a/common/src/main/java/io/bitsquare/common/util/Utilities.java b/common/src/main/java/io/bitsquare/common/util/Utilities.java index 2b9149d1f1..ce1ea7fa88 100644 --- a/common/src/main/java/io/bitsquare/common/util/Utilities.java +++ b/common/src/main/java/io/bitsquare/common/util/Utilities.java @@ -35,9 +35,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLConnection; import java.net.URLEncoder; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -83,36 +80,6 @@ public class Utilities { return threadPoolExecutor; } - public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay) { - return runTimerTaskWithRandomDelay(runnable, minDelay, maxDelay, TimeUnit.SECONDS); - } - - public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) { - return runTimerTask(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit); - } - - public static Timer runTimerTask(Runnable runnable, long delay) { - return runTimerTask(runnable, delay, TimeUnit.SECONDS); - } - - public static Timer runTimerTask(Runnable runnable, long delay, TimeUnit timeUnit) { - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000)); - try { - runnable.run(); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing timerTask failed. " + t.getMessage()); - } - } - }, timeUnit.convert(delay, timeUnit)); - return timer; - } - - public static boolean isUnix() { return isOSX() || isLinux() || getOSName().contains("freebsd"); } diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java index d48a451bcf..9f8ee5641b 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java @@ -119,7 +119,7 @@ public class ArbitratorManager { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } @Override diff --git a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java index 18d55faeb0..7e508a30fa 100644 --- a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java @@ -121,7 +121,7 @@ public class DisputeManager { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } @Override diff --git a/core/src/main/java/io/bitsquare/trade/TradeManager.java b/core/src/main/java/io/bitsquare/trade/TradeManager.java index 29b873bd54..6716606f24 100644 --- a/core/src/main/java/io/bitsquare/trade/TradeManager.java +++ b/core/src/main/java/io/bitsquare/trade/TradeManager.java @@ -154,7 +154,7 @@ public class TradeManager { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } @Override diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 0d4a170b00..1baa45d04c 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -134,7 +134,7 @@ public class OpenOfferManager { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } @Override diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 9e7e7f4508..77a89af8ca 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -249,7 +249,7 @@ class MainViewModel implements ViewModel { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { p2pNetworkInfoFooter.set("Tor hidden service available."); } diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 2021d759e0..f22e6a43b0 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -1,6 +1,5 @@ package io.bitsquare.p2p; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; @@ -12,7 +11,6 @@ import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.PubKeyRing; import io.bitsquare.common.crypto.SealedAndSigned; -import io.bitsquare.common.util.Utilities; import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; @@ -27,8 +25,13 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload; import io.bitsquare.p2p.storage.data.ExpirablePayload; import io.bitsquare.p2p.storage.data.ProtectedData; import io.bitsquare.p2p.storage.data.ProtectedMailboxData; -import io.bitsquare.p2p.storage.messages.DataSetMessage; -import io.bitsquare.p2p.storage.messages.GetDataSetMessage; +import io.bitsquare.p2p.storage.messages.AllDataMessage; +import io.bitsquare.p2p.storage.messages.GetAllDataMessage; +import javafx.beans.property.BooleanProperty; +import javafx.beans.property.SimpleBooleanProperty; +import org.fxmisc.easybind.EasyBind; +import org.fxmisc.easybind.monadic.MonadicBinding; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +49,7 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * Represents our node in the P2P network */ -public class P2PService { +public class P2PService implements SetupListener { private static final Logger log = LoggerFactory.getLogger(P2PService.class); private final SeedNodesRepository seedNodesRepository; @@ -55,7 +58,6 @@ public class P2PService { private final boolean useLocalhost; @Nullable private final EncryptionService encryptionService; - private SetupListener setupListener; private KeyRing keyRing; private final File storageDir; private final NetworkStatistics networkStatistics; @@ -68,18 +70,15 @@ public class P2PService { private final CopyOnWriteArraySet p2pServiceListeners = new CopyOnWriteArraySet<>(); private final Map mailboxMap = new ConcurrentHashMap<>(); private volatile boolean shutDownInProgress; - private Set
seedNodeAddresses; - private Set
connectedSeedNodes = new HashSet<>(); + private Address connectedSeedNode; private Set
authenticatedPeerAddresses = new HashSet<>(); - private boolean authenticatedToFirstPeer; - private boolean allDataReceived; - public boolean authenticated; private boolean shutDownComplete; private CopyOnWriteArraySet shutDownResultHandlers = new CopyOnWriteArraySet<>(); - private final CopyOnWriteArraySet getDataSetMessageNonceList = new CopyOnWriteArraySet<>(); - private boolean allSeedNodesRequested; - private Timer sendGetAllDataMessageTimer; - private volatile boolean hiddenServiceReady; + private BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); + private BooleanProperty allDataLoaded = new SimpleBooleanProperty(); + private BooleanProperty authenticated = new SimpleBooleanProperty(); + private MonadicBinding readyForAuthentication; + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -108,6 +107,7 @@ public class P2PService { private void init() { // network + Set
seedNodeAddresses; if (useLocalhost) { networkNode = new LocalhostNetworkNode(port); seedNodeAddresses = seedNodesRepository.getLocalhostSeedNodeAddresses(); @@ -124,31 +124,6 @@ public class P2PService { dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir); - // Listeners - setupListener = new SetupListener() { - @Override - public void onTorNodeReady() { - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady())); - - // we don't know yet our own address so we can not filter that from the - // seedNodeAddresses in case we are a seed node - sendGetAllDataMessage(seedNodeAddresses); - } - - @Override - public void onHiddenServiceReady() { - hiddenServiceReady = true; - tryStartAuthentication(); - - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServiceReady())); - } - - @Override - public void onSetupFailed(Throwable throwable) { - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable))); - } - }; - networkNode.addConnectionListener(new ConnectionListener() { @Override public void onConnection(Connection connection) { @@ -156,30 +131,11 @@ public class P2PService { @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - checkArgument(peerAddress.equals(connection.getPeerAddress())); + checkArgument(peerAddress.equals(connection.getPeerAddress()), + "peerAddress must match connection.getPeerAddress()"); authenticatedPeerAddresses.add(peerAddress); + authenticated.set(true); - if (!authenticatedToFirstPeer) { - authenticatedToFirstPeer = true; - - SettableFuture future = sendMessage(peerAddress, - new GetDataSetMessage(addToListAndGetNonce())); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peerAddress + " succeeded."); - connectedSeedNodes.add(peerAddress); - } - - @Override - public void onFailure(Throwable throwable) { - log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peerAddress + " failed. " + - "Exception:" + throwable.getMessage()); - } - }); - } - - P2PService.this.authenticated = true; dataStorage.setAuthenticated(true); UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated())); } @@ -197,30 +153,24 @@ public class P2PService { }); networkNode.addMessageListener((message, connection) -> { - if (message instanceof GetDataSetMessage) { + if (message instanceof GetAllDataMessage) { log.trace("Received GetDataSetMessage: " + message); - - // we only reply if we did not get the message form ourselves (in case we are a seed node) - if (!getDataSetMessageNonceList.contains(((GetDataSetMessage) message).nonce)) { - networkNode.sendMessage(connection, new DataSetMessage(getHashSet())); + networkNode.sendMessage(connection, new AllDataMessage(getDataSet())); + } else if (message instanceof AllDataMessage) { + AllDataMessage allDataMessage = (AllDataMessage) message; + HashSet set = allDataMessage.set; + if (!set.isEmpty()) { + StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n"); + set.stream().forEach(e -> sb.append(e.toString() + "\n")); + sb.append("\n"); + log.trace(sb.toString()); + // we keep that connection open as the bootstrapping peer will use that for the authentication + // as we are not authenticated yet the data adding will not be broadcasted + set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); } else { - connection.shutDown(() -> { - if (allSeedNodesRequested) dataReceived(); - }); + log.trace("Received DataSetMessage: Empty data set"); } - } else if (message instanceof DataSetMessage) { - DataSetMessage dataSetMessage = (DataSetMessage) message; - StringBuilder sb = new StringBuilder("Received DataSetMessage:\n\n"); - dataSetMessage.set.stream().forEach(e -> sb.append(e.toString() + "\n")); - sb.append("\n"); - log.trace(sb.toString()); - // we keep that connection open as the bootstrapping peer will use that for the authentication - - // as we are not authenticated yet the data adding will not be broadcasted - HashSet set = dataSetMessage.set; - set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); - - dataReceived(); + allDataLoaded(); } else if (message instanceof SealedAndSignedMessage) { if (encryptionService != null) { try { @@ -239,23 +189,22 @@ public class P2PService { peerGroup.addPeerListener(new PeerListener() { @Override - public void onFirstPeerAdded(Peer peer) { - log.trace("onFirstPeer " + peer.toString()); + public void onFirstAuthenticatePeer(Peer peer) { + log.trace("onFirstAuthenticatePeer " + peer); + sendGetAllDataMessageAfterAuthentication(peer); + } @Override public void onPeerAdded(Peer peer) { - } @Override public void onPeerRemoved(Address address) { - } @Override public void onConnectionAuthenticated(Connection connection) { - } }); @@ -270,6 +219,115 @@ public class P2PService { public void onRemoved(ProtectedData entry) { } }); + + readyForAuthentication = EasyBind.combine(hiddenServicePublished, allDataLoaded, authenticated, + (a, b, c) -> a && b && !c); + readyForAuthentication.subscribe((observable, oldValue, newValue) -> { + // we need to have both the initial data delivered and the hidden service published before we + // bootstrap and authenticate to other nodes. + if (newValue) + authenticateSeedNode(); + }); + + allDataLoaded.addListener((observable, oldValue, newValue) -> { + if (newValue) + UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived())); + }); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // SetupListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onTorNodeReady() { + UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady())); + + // 1. Step: As soon we have the tor node ready (hidden service still not available) we request the + // data set from a random seed node. + sendGetAllDataMessage(peerGroup.getSeedNodeAddresses()); + } + + @Override + public void onHiddenServicePublished() { + checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready"); + + UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished())); + + // 3. (or 2.). Step: Hidden service is published + hiddenServicePublished.set(true); + } + + @Override + public void onSetupFailed(Throwable throwable) { + UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable))); + } + + private void sendGetAllDataMessage(Collection
seedNodeAddresses) { + if (!seedNodeAddresses.isEmpty()) { + log.trace("sendGetAllDataMessage"); + List
remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); + Collections.shuffle(remainingSeedNodeAddresses); + Address candidate = remainingSeedNodeAddresses.remove(0); + log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); + + SettableFuture future = networkNode.sendMessage(candidate, new GetAllDataMessage()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("Send GetAllDataMessage to " + candidate + " succeeded."); + connectedSeedNode = candidate; + } + + @Override + public void onFailure(Throwable throwable) { + log.info("Send GetAllDataMessage to " + candidate + " failed. " + + "That is expected if other seed nodes are offline." + + "\nException:" + throwable.getMessage()); + log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses); + sendGetAllDataMessage(remainingSeedNodeAddresses); + } + }); + } else { + log.info("There is no seed node available for requesting data. That is expected for the first seed node."); + allDataLoaded(); + } + } + + private void allDataLoaded() { + // 2. (or 3.) Step: We got all data loaded + if (!allDataLoaded.get()) { + log.trace("allDataLoaded"); + allDataLoaded.set(true); + } + } + + // 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node. + private void authenticateSeedNode() { + if (connectedSeedNode != null) { + log.trace("authenticateSeedNode"); + peerGroup.authenticateSeedNode(connectedSeedNode); + } + } + + // 5. Step: + private void sendGetAllDataMessageAfterAuthentication(final Peer peer) { + log.trace("sendGetDataSetMessageAfterAuthentication"); + // After authentication we request again data as we might have missed pushed data in the meantime + SettableFuture future = networkNode.sendMessage(peer.connection, new GetAllDataMessage()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + peer.address + " failed. " + + "Exception:" + throwable.getMessage()); + } + }); } @@ -277,12 +335,10 @@ public class P2PService { // API /////////////////////////////////////////////////////////////////////////////////////////// - // startup sequence - // networkNode.start - // SetupListener.onTorNodeReady: sendGetAllDataMessage - // SetupListener.onHiddenServiceReady: tryStartAuthentication - // if hiddenServiceReady && allDataReceived) routing.startAuthentication - // ConnectionListener.onPeerAddressAuthenticated + // used by seed nodes to exclude themselves form list + public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) { + peerGroup.removeMySeedNodeAddressFromList(mySeedNodeAddress); + } public void start() { start(null); @@ -292,7 +348,7 @@ public class P2PService { if (listener != null) addP2PServiceListener(listener); - networkNode.start(setupListener); + networkNode.start(this); } public void shutDown(Runnable shutDownCompleteHandler) { @@ -301,9 +357,6 @@ public class P2PService { shutDownResultHandlers.add(shutDownCompleteHandler); - if (sendGetAllDataMessageTimer != null) - sendGetAllDataMessageTimer.cancel(); - if (dataStorage != null) dataStorage.shutDown(); @@ -324,21 +377,6 @@ public class P2PService { } } - public boolean isAuthenticated() { - return authenticated; - } - - public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { - log.trace("removeEntryFromMailbox"); - ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey); - if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) { - checkArgument(mailboxData.receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()), - "mailboxData.receiversPubKey is not matching with our key. That must not happen."); - removeMailboxData((ExpirableMailboxPayload) mailboxData.expirablePayload, mailboxData.receiversPubKey); - mailboxMap.remove(decryptedMsgWithPubKey); - log.trace("Removed successfully protectedExpirableData."); - } - } /////////////////////////////////////////////////////////////////////////////////////////// // Messaging @@ -347,9 +385,7 @@ public class P2PService { public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) { checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)"); - - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before sending direct messages."); + checkAuthentication(); if (!authenticatedPeerAddresses.contains(peerAddress)) peerGroup.authenticateToPeer(peerAddress, @@ -365,7 +401,7 @@ public class P2PService { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( encryptionService.encryptAndSign(pubKeyRing, message), peerAddress); - SettableFuture future = sendMessage(peerAddress, sealedAndSignedMessage); + SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { @@ -389,9 +425,7 @@ public class P2PService { MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer"); - - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before sending direct messages."); + checkAuthentication(); if (authenticatedPeerAddresses.contains(peerAddress)) { trySendEncryptedMailboxMessage(peerAddress, peersPubKeyRing, message, sendMailboxMessageListener); @@ -411,7 +445,7 @@ public class P2PService { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( encryptionService.encryptAndSign(peersPubKeyRing, message), peerAddress); - SettableFuture future = sendMessage(peerAddress, sealedAndSignedMessage); + SettableFuture future = networkNode.sendMessage(peerAddress, sealedAndSignedMessage); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { @@ -443,12 +477,11 @@ public class P2PService { /////////////////////////////////////////////////////////////////////////////////////////// - // ProtectedData + // Data storage /////////////////////////////////////////////////////////////////////////////////////////// public boolean addData(ExpirablePayload expirablePayload) { - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before adding data to the P2P network."); + checkAuthentication(); try { return dataStorage.add(dataStorage.getDataWithSignedSeqNr(expirablePayload, @@ -460,8 +493,7 @@ public class P2PService { } public boolean addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before adding data to the P2P network."); + checkAuthentication(); try { return dataStorage.add(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload, @@ -473,8 +505,8 @@ public class P2PService { } public boolean removeData(ExpirablePayload expirablePayload) { - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before removing data from the P2P network."); + checkAuthentication(); + try { return dataStorage.remove(dataStorage.getDataWithSignedSeqNr(expirablePayload, keyRing.getSignatureKeyPair()), networkNode.getAddress()); @@ -484,9 +516,22 @@ public class P2PService { } } + public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { + checkAuthentication(); + + ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey); + if (mailboxData != null && mailboxData.expirablePayload instanceof ExpirableMailboxPayload) { + checkArgument(mailboxData.receiversPubKey.equals(keyRing.getSignatureKeyPair().getPublic()), + "mailboxData.receiversPubKey is not matching with our key. That must not happen."); + removeMailboxData((ExpirableMailboxPayload) mailboxData.expirablePayload, mailboxData.receiversPubKey); + mailboxMap.remove(decryptedMsgWithPubKey); + log.trace("Removed successfully protectedExpirableData."); + } + } + public boolean removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { - if (!authenticatedToFirstPeer) - throw new AuthenticationException("You must be authenticated before removing data from the P2P network."); + checkAuthentication(); + try { return dataStorage.removeMailboxData(dataStorage.getMailboxDataWithSignedSeqNr(expirableMailboxPayload, keyRing.getSignatureKeyPair(), receiversPublicKey), networkNode.getAddress()); @@ -541,10 +586,15 @@ public class P2PService { dataStorage.addHashMapChangedListener(hashMapChangedListener); } + /////////////////////////////////////////////////////////////////////////////////////////// // Getters /////////////////////////////////////////////////////////////////////////////////////////// + public boolean isAuthenticated() { + return authenticated.get(); + } + public NetworkNode getNetworkNode() { return networkNode; } @@ -566,94 +616,7 @@ public class P2PService { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void sendGetAllDataMessage(Set
seedNodeAddresses) { - Address networkNodeAddress = networkNode.getAddress(); - if (networkNodeAddress != null) - seedNodeAddresses.remove(networkNodeAddress); - List
remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); - - if (!seedNodeAddresses.isEmpty()) { - Collections.shuffle(remainingSeedNodeAddresses); - Address candidate = remainingSeedNodeAddresses.remove(0); - log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); - - // we use a nonce to see if we are sending to ourselves in case we are a seed node - // we don't know our own onion address at that moment so we cannot filter seed nodes - SettableFuture future = sendMessage(candidate, new GetDataSetMessage(addToListAndGetNonce())); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.info("Send GetAllDataMessage to " + candidate + " succeeded."); - connectedSeedNodes.add(candidate); - - // we try to connect to 2 seed nodes - if (connectedSeedNodes.size() < 2 && !remainingSeedNodeAddresses.isEmpty()) { - // give a random pause of 1-3 sec. before using the next - - if (sendGetAllDataMessageTimer != null) sendGetAllDataMessageTimer.cancel(); - sendGetAllDataMessageTimer = Utilities.runTimerTaskWithRandomDelay(() -> { - Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000)); - try { - UserThread.execute(() -> sendGetAllDataMessage(Sets.newHashSet(remainingSeedNodeAddresses))); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - }, 1, 3); - } else { - allSeedNodesRequested = true; - } - } - - @Override - public void onFailure(Throwable throwable) { - log.info("Send GetAllDataMessage to " + candidate + " failed. Exception:" + throwable.getMessage()); - log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses); - UserThread.execute(() -> sendGetAllDataMessage(Sets.newHashSet(remainingSeedNodeAddresses))); - } - }); - } else { - log.info("There is no seed node available for requesting data. That is expected for the first seed node."); - dataReceived(); - allSeedNodesRequested = true; - } - } - - private long addToListAndGetNonce() { - long nonce = new Random().nextLong(); - while (nonce == 0) { - nonce = new Random().nextLong(); - } - getDataSetMessageNonceList.add(nonce); - return nonce; - } - - private void dataReceived() { - if (!allDataReceived) { - allDataReceived = true; - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived())); - - tryStartAuthentication(); - } - } - - private void tryStartAuthentication() { - // we need to have both the initial data delivered and the hidden service published before we - // bootstrap and authenticate to other nodes - if (allDataReceived && hiddenServiceReady) { - // we remove ourselves in case we are a seed node - checkArgument(networkNode.getAddress() != null, "Address must be set when we are authenticated"); - connectedSeedNodes.remove(networkNode.getAddress()); - - peerGroup.startAuthentication(connectedSeedNodes); - } - } - - private SettableFuture sendMessage(Address peerAddress, Message message) { - return networkNode.sendMessage(peerAddress, message); - } - - private HashSet getHashSet() { + private HashSet getDataSet() { return new HashSet<>(dataStorage.getMap().values()); } @@ -670,14 +633,6 @@ public class P2PService { Address senderAddress = mailboxMessage.getSenderAddress(); checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages"); - log.trace("mailboxData.publicKey " + mailboxData.ownerStoragePubKey.hashCode()); - log.trace("keyRing.getStorageSignatureKeyPair().getPublic() " - + keyRing.getSignatureKeyPair().getPublic().hashCode()); - log.trace("keyRing.getMsgSignatureKeyPair().getPublic() " - + keyRing.getSignatureKeyPair().getPublic().hashCode()); - log.trace("keyRing.getMsgEncryptionKeyPair().getPublic() " - + keyRing.getEncryptionKeyPair().getPublic().hashCode()); - mailboxMap.put(decryptedMsgWithPubKey, mailboxData); log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress=" @@ -691,4 +646,9 @@ public class P2PService { } } } + + private void checkAuthentication() { + if (authenticatedPeerAddresses.isEmpty()) + throw new AuthenticationException("You must be authenticated before adding data to the P2P network."); + } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index ced575ec25..3837178c49 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -103,7 +103,7 @@ public class Connection { // API /////////////////////////////////////////////////////////////////////////////////////////// - public synchronized void setAuthenticated(Address peerAddress, Connection connection) { + public void setAuthenticated(Address peerAddress, Connection connection) { this.peerAddress = peerAddress; isAuthenticated = true; UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection)); @@ -149,7 +149,7 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// @Nullable - public synchronized Address getPeerAddress() { + public Address getPeerAddress() { return peerAddress; } @@ -157,7 +157,7 @@ public class Connection { return sharedSpace.getLastActivityDate(); } - public synchronized boolean isAuthenticated() { + public boolean isAuthenticated() { return isAuthenticated; } @@ -321,11 +321,11 @@ public class Connection { this.useCompression = useCompression; } - public synchronized void updateLastActivityDate() { + public void updateLastActivityDate() { lastActivityDate = new Date(); } - public synchronized Date getLastActivityDate() { + public Date getLastActivityDate() { return lastActivityDate; } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index af36547228..4346c16ad5 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -25,8 +25,8 @@ import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class); - private static int simulateTorDelayTorNode = 1 * 1000; - private static int simulateTorDelayHiddenService = 2 * 1000; + private static int simulateTorDelayTorNode = 1 * 100; + private static int simulateTorDelayHiddenService = 2 * 100; private Address address; public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { @@ -69,7 +69,7 @@ public class LocalhostNetworkNode extends NetworkNode { address = new Address("localhost", port); - setupListeners.stream().forEach(e -> e.onHiddenServiceReady()); + setupListeners.stream().forEach(e -> e.onHiddenServicePublished()); }); }); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 67dcf8320b..a05a62ffe2 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -125,6 +125,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener ListenableFuture future = executorService.submit(() -> { Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId()); try { + log.debug("## connection.sendMessage"); connection.sendMessage(message); return connection; } catch (Throwable t) { @@ -134,10 +135,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { + log.debug("## connection.sendMessage onSuccess"); UserThread.execute(() -> resultFuture.set(connection)); } public void onFailure(@NotNull Throwable throwable) { + log.debug("## connection.sendMessage onFailure"); UserThread.execute(() -> resultFuture.setException(throwable)); } }); diff --git a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java index 9a1acedc05..371b91bc41 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java @@ -5,7 +5,7 @@ public interface SetupListener { void onTorNodeReady(); - void onHiddenServiceReady(); + void onHiddenServicePublished(); void onSetupFailed(Throwable throwable); diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index d8220555e7..61fb5df3e1 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -7,7 +7,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.TorNode; @@ -79,12 +78,8 @@ public class TorNetworkNode extends NetworkNode { TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; startServer(hiddenServiceDescriptor.getServerSocket()); - Runnable task = () -> { - Thread.currentThread().setName("DelayNotifySetupListenersTimer-" + new Random().nextInt(1000)); - setupListeners.stream() - .forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady())); - }; - Utilities.runTimerTask(task, 500, TimeUnit.MILLISECONDS); + UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()), + 500, TimeUnit.MILLISECONDS); }); }); } @@ -102,8 +97,7 @@ public class TorNetworkNode extends NetworkNode { log.info("Shutdown TorNetworkNode"); this.shutDownCompleteHandler = shutDownCompleteHandler; - shutDownTimeoutTimer = Utilities.runTimerTask(() -> { - Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000)); + shutDownTimeoutTimer = UserThread.runAfter(() -> { log.error("A timeout occurred at shutDown"); shutDownExecutorService(); }, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS); @@ -176,8 +170,7 @@ public class TorNetworkNode extends NetworkNode { private void restartTor() { restartCounter++; if (restartCounter <= MAX_RESTART_ATTEMPTS) { - shutDown(() -> Utilities.runTimerTask(() -> { - Thread.currentThread().setName("RestartTorTimer-" + new Random().nextInt(1000)); + shutDown(() -> UserThread.runAfter(() -> { log.warn("We restart tor as starting tor failed."); start(null); }, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS)); diff --git a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java new file mode 100644 index 0000000000..af12b24614 --- /dev/null +++ b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationHandshake.java @@ -0,0 +1,255 @@ +package io.bitsquare.p2p.peer; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Tuple2; +import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peer.messages.ChallengeMessage; +import io.bitsquare.p2p.peer.messages.GetPeersMessage; +import io.bitsquare.p2p.peer.messages.PeersMessage; +import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + + +// authentication example: +// node2 -> node1 RequestAuthenticationMessage +// node1: close connection +// node1 -> node2 ChallengeMessage on new connection +// node2: authentication to node1 done if nonce ok +// node2 -> node1 GetPeersMessage +// node1: authentication to node2 done if nonce ok +// node1 -> node2 PeersMessage + +public class AuthenticationHandshake { + private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class); + + private final NetworkNode networkNode; + private final PeerGroup peerGroup; + private final Address myAddress; + + private SettableFuture resultFuture; + private long startAuthTs; + private long nonce = 0; + + public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) { + this.networkNode = networkNode; + this.peerGroup = peerGroup; + this.myAddress = myAddress; + + setupMessageListener(); + } + + public SettableFuture requestAuthenticationToPeer(Address peerAddress) { + // Requesting peer + resultFuture = SettableFuture.create(); + startAuthTs = System.currentTimeMillis(); + SettableFuture future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." + + "\nException:" + throwable.getMessage()); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + + return resultFuture; + } + + public SettableFuture requestAuthentication(Set
remainingAddresses, Address peerAddress) { + log.info("requestAuthentication " + this); + log.info("remainingAddresses " + remainingAddresses); + log.info("peerAddress " + peerAddress); + // Requesting peer + resultFuture = SettableFuture.create(); + startAuthTs = System.currentTimeMillis(); + remainingAddresses.remove(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, new RequestAuthenticationMessage(myAddress, getAndSetNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + log.info("send RequestAuthenticationMessage to " + peerAddress + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Send RequestAuthenticationMessage to " + peerAddress + " failed." + + "\nThat is expected if seed nodes are offline." + + "\nException:" + throwable.getMessage()); + log.trace("We try to authenticate to another random seed nodes of that list: " + remainingAddresses); + authenticateToNextRandomPeer(remainingAddresses); + } + }); + + return resultFuture; + } + + + public SettableFuture processAuthenticationRequest(RequestAuthenticationMessage requestAuthenticationMessage, Connection connection) { + // Responding peer + resultFuture = SettableFuture.create(); + startAuthTs = System.currentTimeMillis(); + + Address peerAddress = requestAuthenticationMessage.address; + log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); + connection.shutDown(() -> UserThread.runAfter(() -> { + // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to + // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) + log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); + + SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(myAddress, requestAuthenticationMessage.nonce, getAndSetNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.debug("onSuccess sending ChallengeMessage"); + } + + @Override + public void onFailure(Throwable throwable) { + log.warn("onFailure sending ChallengeMessage."); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + }, + 100 + PeerGroup.simulateAuthTorNode, + TimeUnit.MILLISECONDS)); + + return resultFuture; + } + + private void setupMessageListener() { + networkNode.addMessageListener((message, connection) -> { + if (message instanceof ChallengeMessage) { + // Requesting peer + ChallengeMessage challengeMessage = (ChallengeMessage) message; + Address peerAddress = challengeMessage.address; + log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); + log.trace("challengeMessage" + challengeMessage); + // HashMap tempNonceMap = new HashMap<>(nonceMap); + boolean verified = nonce != 0 && nonce == challengeMessage.requesterNonce; + if (verified) { + connection.setPeerAddress(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersMessage(myAddress, challengeMessage.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("GetPeersMessage sending failed " + throwable.getMessage()); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + } else { + log.warn("verify nonce failed. challengeMessage=" + challengeMessage + " / nonce=" + nonce); + UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. challengeMessage=" + challengeMessage + " / nonceMap=" + nonce))); + } + } else if (message instanceof GetPeersMessage) { + // Responding peer + GetPeersMessage getPeersMessage = (GetPeersMessage) message; + Address peerAddress = getPeersMessage.address; + log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); + boolean verified = nonce != 0 && nonce == getPeersMessage.challengerNonce; + if (verified) { + // we add the reported peers to our own set + HashSet
peerAddresses = ((GetPeersMessage) message).peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + + SettableFuture future = networkNode.sendMessage(peerAddress, + new PeersMessage(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); + log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress + + " with allPeers=" + peerGroup.getAllPeerAddresses()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PeersMessage sending failed " + throwable.getMessage()); + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + + log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + + " authenticated (" + connection.getObjectId() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + UserThread.execute(() -> resultFuture.set(connection)); + } else { + log.warn("verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce); + UserThread.execute(() -> resultFuture.setException(new Exception("Verify nonce failed. getPeersMessage=" + getPeersMessage + " / nonceMap=" + nonce))); + } + } else if (message instanceof PeersMessage) { + // Requesting peer + PeersMessage peersMessage = (PeersMessage) message; + Address peerAddress = peersMessage.address; + log.trace("PeersMessage from " + peerAddress + " at " + myAddress); + HashSet
peerAddresses = peersMessage.peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + + // we wait until the handshake is completed before setting the authenticate flag + // authentication at both sides of the connection + log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + + " authenticated (" + connection.getObjectId() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + UserThread.execute(() -> resultFuture.set(connection)); + } + }); + } + + + private void authenticateToNextRandomPeer(Set
remainingAddresses) { + Optional>> tupleOptional = getRandomAddressAndRemainingSet(remainingAddresses); + if (tupleOptional.isPresent()) { + Tuple2> tuple = tupleOptional.get(); + requestAuthentication(tuple.second, tuple.first); + } else { + log.info("No other seed node found. That is expected for the first seed node."); + UserThread.execute(() -> resultFuture.set(null)); + } + } + + private Optional>> getRandomAddressAndRemainingSet(Set
addresses) { + if (!addresses.isEmpty()) { + List
list = new ArrayList<>(addresses); + Collections.shuffle(list); + Address address = list.remove(0); + return Optional.of(new Tuple2<>(address, Sets.newHashSet(list))); + } else { + return Optional.empty(); + } + } + + private long getAndSetNonce() { + nonce = new Random().nextLong(); + while (nonce == 0) + nonce = getAndSetNonce(); + + return nonce; + } + +} diff --git a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java index 2c1810158d..18b68f98cb 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/AuthenticationListener.java @@ -4,7 +4,7 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; public abstract class AuthenticationListener implements PeerListener { - public void onFirstPeerAdded(Peer peer) { + public void onFirstAuthenticatePeer(Peer peer) { } public void onPeerAdded(Peer peer) { diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java index 231ecd448e..cd53a29b79 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java @@ -1,14 +1,18 @@ package io.bitsquare.p2p.peer; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.peer.messages.*; +import io.bitsquare.p2p.network.Connection; +import io.bitsquare.p2p.network.ConnectionListener; +import io.bitsquare.p2p.network.MessageListener; +import io.bitsquare.p2p.network.NetworkNode; +import io.bitsquare.p2p.peer.messages.MaintenanceMessage; +import io.bitsquare.p2p.peer.messages.PingMessage; +import io.bitsquare.p2p.peer.messages.PongMessage; +import io.bitsquare.p2p.peer.messages.RequestAuthenticationMessage; import io.bitsquare.p2p.storage.messages.BroadcastMessage; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -21,10 +25,13 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkArgument; + + public class PeerGroup { private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); - private static int simulateAuthTorNode = 0; + static int simulateAuthTorNode = 0; public static void setSimulateAuthTorNode(int simulateAuthTorNode) { PeerGroup.simulateAuthTorNode = simulateAuthTorNode; @@ -33,19 +40,18 @@ public class PeerGroup { private static int MAX_CONNECTIONS = 8; private static int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. private static int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; - private long startAuthTs; public static void setMaxConnections(int maxConnections) { MAX_CONNECTIONS = maxConnections; } private final NetworkNode networkNode; - private final CopyOnWriteArraySet
seedNodes; - private final ConcurrentHashMap nonceMap = new ConcurrentHashMap<>(); + + + private final Set
seedNodeAddresses; private final CopyOnWriteArraySet peerListeners = new CopyOnWriteArraySet<>(); private final ConcurrentHashMap authenticatedPeers = new ConcurrentHashMap<>(); private final CopyOnWriteArraySet
reportedPeerAddresses = new CopyOnWriteArraySet<>(); - private final ConcurrentHashMap authenticationCompleteHandlers = new ConcurrentHashMap<>(); ; private final Timer maintenanceTimer = new Timer(); @@ -58,21 +64,40 @@ public class PeerGroup { public PeerGroup(final NetworkNode networkNode, Set
seeds) { this.networkNode = networkNode; - - // We copy it as we remove ourselves later from the list if we are a seed node - this.seedNodes = new CopyOnWriteArraySet<>(seeds); + this.seedNodeAddresses = seeds; init(networkNode); } private void init(NetworkNode networkNode) { networkNode.addMessageListener((message, connection) -> { - if (message instanceof AuthenticationMessage) - processAuthenticationMessage((AuthenticationMessage) message, connection); - else if (message instanceof MaintenanceMessage) + if (message instanceof MaintenanceMessage) processMaintenanceMessage((MaintenanceMessage) message, connection); + else if (message instanceof RequestAuthenticationMessage) { + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress()); + SettableFuture future = authenticationHandshake.processAuthenticationRequest((RequestAuthenticationMessage) message, connection); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + if (connection != null) { + UserThread.execute(() -> { + setAuthenticated(connection, connection.getPeerAddress()); + purgeReportedPeers(); + }); + } + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + UserThread.execute(() -> removePeer(connection.getPeerAddress())); + } + }); + } }); + networkNode.addConnectionListener(new ConnectionListener() { @Override public void onConnection(Connection connection) { @@ -96,24 +121,6 @@ public class PeerGroup { } }); - networkNode.addSetupListener(new SetupListener() { - @Override - public void onTorNodeReady() { - } - - @Override - public void onHiddenServiceReady() { - // remove ourselves in case we are a seed node - Address myAddress = getAddress(); - if (myAddress != null) - seedNodes.remove(myAddress); - } - - @Override - public void onSetupFailed(Throwable throwable) { - } - }); - maintenanceTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { @@ -140,11 +147,7 @@ public class PeerGroup { log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); Connection connection = authenticatedConnections.remove(0); log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - - connection.shutDown(() -> Utilities.runTimerTask(() -> { - Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000)); - disconnectOldConnections(); - }, 1, TimeUnit.MILLISECONDS)); + connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS)); } } @@ -153,8 +156,7 @@ public class PeerGroup { Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); connectedPeersList.stream() .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) - .forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> { - Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000)); + .forEach(e -> UserThread.runAfterRandomDelay(() -> { SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); Futures.addCallback(future, new FutureCallback() { @Override @@ -176,6 +178,10 @@ public class PeerGroup { // API /////////////////////////////////////////////////////////////////////////////////////////// + public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) { + seedNodeAddresses.remove(mySeedNodeAddress); + } + public void shutDown() { if (!shutDownInProgress) { shutDownInProgress = true; @@ -189,15 +195,16 @@ public class PeerGroup { log.trace("message = " + message); printConnectedPeersMap(); + // TODO add randomized timing? authenticatedPeers.values().stream() .filter(e -> !e.address.equals(sender)) .forEach(peer -> { - log.trace("Broadcast message from " + getAddress() + " to " + peer.address + "."); + log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + "."); SettableFuture future = networkNode.sendMessage(peer.address, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded."); + log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded."); } @Override @@ -209,6 +216,270 @@ public class PeerGroup { }); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Authentication to seed node + /////////////////////////////////////////////////////////////////////////////////////////// + + public void authenticateSeedNode(Address peerAddress) { + authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true); + } + + public void authenticateToSeedNode(Set
remainingAddresses, Address peerAddress, boolean continueOnSuccess) { + checkArgument(!authenticatedPeers.containsKey(peerAddress), + "We have that peer already authenticated. That must never happen."); + + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + SettableFuture future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + if (connection != null) { + setAuthenticated(connection, peerAddress); + if (continueOnSuccess) { + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + log.info("We still don't have enough connections. Lets try the reported peers."); + authenticateToAnyReportedPeer(); + } else { + log.info("We have already enough connections."); + } + } else { + log.info("We have already tried all reported peers and seed nodes. " + + "We stop bootstrapping now, but will repeat after an while."); + UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60); + } + } + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + removePeer(peerAddress); + + // If we fail we try again with the remaining set + remainingAddresses.remove(peerAddress); + List
list = new ArrayList<>(remainingAddresses); + removeAuthenticatedPeersFromList(list); + if (!list.isEmpty()) { + Address item = getAndRemoveRandomItem(list); + log.info("We try to build an authenticated connection to a seed node. " + item); + authenticateToSeedNode(remainingAddresses, item, true); + } else { + log.info("We don't have any more seed nodes for connecting. Lets try the reported peers."); + authenticateToAnyReportedPeer(); + } + } + }); + } + + + private void authenticateToAnyReportedPeer() { + // after we have at least one seed node we try to get reported peers connected + List
list = new ArrayList<>(reportedPeerAddresses); + removeAuthenticatedPeersFromList(list); + if (!list.isEmpty()) { + Address item = getAndRemoveRandomItem(list); + log.info("We try to build an authenticated connection to a random peer. " + item + " / list=" + list); + authenticateToReportedPeer(new HashSet<>(list), item); + } else { + log.info("We don't have any reported peers for connecting. Lets try the remaining seed nodes."); + authenticateToRemainingSeedNodes(); + } + } + + public void authenticateToReportedPeer(Set
remainingAddresses, Address peerAddress) { + checkArgument(!authenticatedPeers.containsKey(peerAddress), + "We have that peer already authenticated. That must never happen."); + + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + SettableFuture future = authenticationHandshake.requestAuthentication(remainingAddresses, peerAddress); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + if (connection != null) { + setAuthenticated(connection, peerAddress); + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + log.info("We still don't have enough connections. Lets try the remaining seed nodes."); + authenticateToRemainingSeedNodes(); + } else { + log.info("We have already enough connections."); + } + } + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + removePeer(peerAddress); + } + }); + } + + private void authenticateToRemainingSeedNodes() { + // after we have at least one seed node we try to get reported peers connected + List
list = new ArrayList<>(seedNodeAddresses); + removeAuthenticatedPeersFromList(list); + if (!list.isEmpty()) { + Address item = getAndRemoveRandomItem(list); + log.info("We try to build an authenticated connection to a random seed node. " + item + " / list=" + list); + authenticateToSeedNode(new HashSet<>(list), item, false); + } else { + log.info("We don't have any more seed nodes for connecting. " + + "We stop bootstrapping now, but will repeat after an while."); + UserThread.runAfter(() -> authenticateToAnyReportedPeer(), 60); + } + } + + + /*private void authenticateToAnyNode1(Set
addresses, Address peerAddress, boolean prioritizeSeedNodes) { + checkArgument(!authenticatedPeers.containsKey(peerAddress), + "We have that peer already authenticated. That must never happen."); + + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + SettableFuture future = authenticationHandshake.requestAuthentication(addresses, peerAddress); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + setAuthenticated(connection, peerAddress); + authenticateToNextRandomPeer(); + } + + @Override + public void onFailure(Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + removePeer(peerAddress); + authenticateToNextRandomPeer(); + } + }); + } + + private void authenticateToNextRandomPeer() { + UserThread.runAfterRandomDelay(() -> { + log.info("authenticateToNextRandomPeer"); + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + Optional
candidate = getRandomReportedPeerAddress(); + if (candidate.isPresent()) { + log.info("We try to build an authenticated connection to a random peer. " + candidate.get()); + authenticateToReportedPeer(candidate.get(), ); + } else { + log.info("No more reportedPeerAddresses available for connecting. We try the remaining seed nodes"); + candidate = getRandomSeedNodeAddress(); + if (candidate.isPresent()) { + log.info("We try to build an authenticated connection to a random seed node. " + candidate.get()); + authenticateToReportedPeer(candidate.get(), get); + } else { + log.info("No more seed nodes available for connecting."); + } + } + } else { + log.info("We have already enough connections."); + } + }, 200, 400, TimeUnit.MILLISECONDS); + }*/ + + private Optional
getRandomSeedNodeAddress() { + List
list = new ArrayList<>(seedNodeAddresses); + log.debug("### getRandomSeedNodeAddress list " + list); + removeAuthenticatedPeersFromList(list); + log.debug("### list post removeAuthenticatedPeersFromList " + list); + return getRandomEntry(list); + } + + private Optional
getRandomReportedPeerAddress() { + List
list = new ArrayList<>(reportedPeerAddresses); + log.debug("### list reportedPeerAddresses " + reportedPeerAddresses); + log.debug("### list authenticatedPeers " + authenticatedPeers); + log.debug("### list pre " + list); + removeAuthenticatedPeersFromList(list); + log.debug("### list post " + list); + return getRandomEntry(list); + } + + private void removeAuthenticatedPeersFromList(List
list) { + authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); + } + + private Optional
getRandomEntry(List
list) { + if (list.size() > 0) { + Collections.shuffle(list); + return Optional.of(list.get(0)); + } else { + return Optional.empty(); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Authentication to non-seed node peer + /////////////////////////////////////////////////////////////////////////////////////////// + + public void authenticateToPeer(Address peerAddress, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) { + checkArgument(!authenticatedPeers.containsKey(peerAddress), + "We have that seed node already authenticated. That must never happen."); + + AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress()); + SettableFuture future = authenticationHandshake.requestAuthenticationToPeer(peerAddress); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Connection connection) { + if (connection != null) { + setAuthenticated(connection, peerAddress); + if (authenticationCompleteHandler != null) + authenticationCompleteHandler.run(); + } + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + throwable.printStackTrace(); + log.error("AuthenticationHandshake failed. " + throwable.getMessage()); + removePeer(peerAddress); + if (faultHandler != null) + faultHandler.run(); + } + }); + } + + void setAuthenticated(Connection connection, Address peerAddress) { + log.info("\n\n############################################################\n" + + "We are authenticated to:" + + "\nconnection=" + connection + + "\nmyAddress=" + getMyAddress() + + "\npeerAddress= " + peerAddress + + "\n############################################################\n"); + + connection.setAuthenticated(peerAddress, connection); + + Peer peer = new Peer(connection); + addAuthenticatedPeer(peerAddress, peer); + + peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); + } + + private void addAuthenticatedPeer(Address address, Peer peer) { + boolean firstPeerAdded; + authenticatedPeers.put(address, peer); + firstPeerAdded = authenticatedPeers.size() == 1; + + UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer))); + + if (firstPeerAdded) + UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer))); + + if (authenticatedPeers.size() > MAX_CONNECTIONS) + disconnectOldConnections(); + + printConnectedPeersMap(); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listeners + /////////////////////////////////////////////////////////////////////////////////////////// + public void addMessageListener(MessageListener messageListener) { networkNode.addMessageListener(messageListener); } @@ -225,6 +496,11 @@ public class PeerGroup { peerListeners.remove(peerListener); } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Getters + /////////////////////////////////////////////////////////////////////////////////////////// + public Map getAuthenticatedPeers() { return authenticatedPeers; } @@ -233,202 +509,19 @@ public class PeerGroup { CopyOnWriteArraySet
allPeerAddresses = new CopyOnWriteArraySet<>(reportedPeerAddresses); allPeerAddresses.addAll(authenticatedPeers.values().stream() .map(e -> e.address).collect(Collectors.toList())); - // remove own address and seed nodes - allPeerAddresses.remove(getAddress()); return allPeerAddresses; } + public Set
getSeedNodeAddresses() { + return seedNodeAddresses; + } + /////////////////////////////////////////////////////////////////////////////////////////// - // Authentication + // Reported peers /////////////////////////////////////////////////////////////////////////////////////////// - // authentication example: - // node2 -> node1 RequestAuthenticationMessage - // node1: close connection - // node1 -> node2 ChallengeMessage on new connection - // node2: authentication to node1 done if nonce ok - // node2 -> node1 GetPeersMessage - // node1: authentication to node2 done if nonce ok - // node1 -> node2 PeersMessage - - public void startAuthentication(Set
connectedSeedNodes) { - connectedSeedNodes.forEach(connectedSeedNode -> { - sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); - }); - } - - private void sendRequestAuthenticationMessage(Set
remainingSeedNodes, final Address address) { - log.info("We try to authenticate to a random seed node. " + address); - startAuthTs = System.currentTimeMillis(); - final boolean[] alreadyConnected = {false}; - authenticatedPeers.values().stream().forEach(e -> { - remainingSeedNodes.remove(e.address); - if (address.equals(e.address)) - alreadyConnected[0] = true; - }); - if (!alreadyConnected[0]) { - long nonce = addToMapAndGetNonce(address); - SettableFuture future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.info("send RequestAuthenticationMessage to " + address + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Send RequestAuthenticationMessage to " + address + " failed. Exception:" + throwable.getMessage()); - log.trace("We try to authenticate to another random seed nodes of that list: " + remainingSeedNodes); - getNextSeedNode(remainingSeedNodes); - } - }); - } else { - getNextSeedNode(remainingSeedNodes); - } - } - - private void getNextSeedNode(Set
remainingSeedNodes) { - List
remainingSeedNodeAddresses = new ArrayList<>(remainingSeedNodes); - - Address myAddress = getAddress(); - if (myAddress != null) - remainingSeedNodeAddresses.remove(myAddress); - - if (!remainingSeedNodeAddresses.isEmpty()) { - Collections.shuffle(remainingSeedNodeAddresses); - Address address = remainingSeedNodeAddresses.remove(0); - sendRequestAuthenticationMessage(Sets.newHashSet(remainingSeedNodeAddresses), address); - } else { - log.info("No other seed node found. That is expected for the first seed node."); - } - } - - - private void processAuthenticationMessage(AuthenticationMessage message, Connection connection) { - log.trace("processAuthenticationMessage " + message + " from " + connection.getPeerAddress() + " at " + getAddress()); - if (message instanceof RequestAuthenticationMessage) { - RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message; - Address peerAddress = requestAuthenticationMessage.address; - log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); - connection.shutDown(() -> Utilities.runTimerTask(() -> { - Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000)); - // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to - // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) - log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); - long nonce = addToMapAndGetNonce(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.debug("onSuccess sending ChallengeMessage"); - } - - @Override - public void onFailure(Throwable throwable) { - log.warn("onFailure sending ChallengeMessage. We try again."); - SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.debug("onSuccess sending 2. ChallengeMessage"); - } - - @Override - public void onFailure(Throwable throwable) { - log.warn("onFailure sending ChallengeMessage. We give up."); - } - }); - } - }); - }, - 100 + simulateAuthTorNode, - TimeUnit.MILLISECONDS)); - } else if (message instanceof ChallengeMessage) { - ChallengeMessage challengeMessage = (ChallengeMessage) message; - Address peerAddress = challengeMessage.address; - log.trace("ChallengeMessage from " + peerAddress + " at " + getAddress()); - log.trace("nonceMap" + nonceMap); - log.trace("challengeMessage" + challengeMessage); - HashMap tempNonceMap = new HashMap<>(nonceMap); - boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress); - if (verified) { - connection.setPeerAddress(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, - new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, new ArrayList
(getAllPeerAddresses()))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersMessage sending failed " + throwable.getMessage()); - removePeer(peerAddress); - } - }); - } else { - log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap); - } - } else if (message instanceof GetPeersMessage) { - GetPeersMessage getPeersMessage = (GetPeersMessage) message; - Address peerAddress = getPeersMessage.address; - log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress()); - boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress); - if (verified) { - setAuthenticated(connection, peerAddress); - purgeReportedPeers(); - SettableFuture future = networkNode.sendMessage(peerAddress, - new PeersMessage(getAddress(), new ArrayList(getAllPeerAddresses()))); - log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress() - + " with allPeers=" + getAllPeerAddresses()); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("PeersMessage sending failed " + throwable.getMessage()); - removePeer(peerAddress); - } - }); - - // now we add the reported peers to our own set - ArrayList
peerAddresses = ((GetPeersMessage) message).peerAddresses; - log.trace("Received peers: " + peerAddresses); - // remove ourselves - addToReportedPeers(peerAddresses, connection); - } - } else if (message instanceof PeersMessage) { - PeersMessage peersMessage = (PeersMessage) message; - Address peerAddress = peersMessage.address; - log.trace("PeersMessage from " + peerAddress + " at " + getAddress()); - ArrayList
peerAddresses = peersMessage.peerAddresses; - log.trace("Received peers: " + peerAddresses); - // remove ourselves - addToReportedPeers(peerAddresses, connection); - - // we wait until the handshake is completed before setting the authenticate flag - // authentication at both sides of the connection - - log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress - + " authenticated (" + connection.getObjectId() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); - - setAuthenticated(connection, peerAddress); - - Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress()); - if (authenticationCompleteHandler != null) - authenticationCompleteHandler.run(); - - authenticateToNextRandomPeer(); - } - } - - private void addToReportedPeers(ArrayList
peerAddresses, Connection connection) { + void addToReportedPeers(HashSet
peerAddresses, Connection connection) { log.trace("addToReportedPeers"); // we disconnect misbehaving nodes trying to send too many peers // reported peers include the peers connected peers which is normally max. 8 but we give some headroom @@ -436,113 +529,29 @@ public class PeerGroup { if (peerAddresses.size() > 1100) { connection.shutDown(); } else { - peerAddresses.remove(getAddress()); + peerAddresses.remove(getMyAddress()); reportedPeerAddresses.addAll(peerAddresses); purgeReportedPeers(); } } - private void purgeReportedPeers() { + void purgeReportedPeers() { log.trace("purgeReportedPeers"); int all = getAllPeerAddresses().size(); if (all > 1000) { int diff = all - 100; - List
list = new ArrayList<>(getNotConnectedPeerAddresses()); + List
list = new LinkedList<>(getReportedNotConnectedPeerAddresses()); for (int i = 0; i < diff; i++) { - Address toRemove = list.remove(new Random().nextInt(list.size())); + Address toRemove = getAndRemoveRandomItem(list); reportedPeerAddresses.remove(toRemove); } } } - private synchronized CopyOnWriteArraySet
getNotConnectedPeerAddresses() { - CopyOnWriteArraySet
list = new CopyOnWriteArraySet<>(getAllPeerAddresses()); - authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); - return list; - } - - private void authenticateToNextRandomPeer() { - Utilities.runTimerTaskWithRandomDelay(() -> { - Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000)); - if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { - Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress(); - if (randomNotConnectedPeerAddress != null) { - log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress); - authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer()); - } else { - log.info("No more peers available for connecting."); - } - } else { - log.info("We have already enough connections."); - } - }, 200, 400, TimeUnit.MILLISECONDS); - } - - public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) { - startAuthTs = System.currentTimeMillis(); - - if (authenticationCompleteHandler != null) - authenticationCompleteHandlers.put(address, authenticationCompleteHandler); - - long nonce = addToMapAndGetNonce(address); - SettableFuture future = networkNode.sendMessage(address, new RequestAuthenticationMessage(getAddress(), nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Connection connection) { - log.debug("send RequestAuthenticationMessage succeeded"); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("send IdMessage failed. " + throwable.getMessage()); - removePeer(address); - if (faultHandler != null) faultHandler.run(); - } - }); - } - - private long addToMapAndGetNonce(Address peerAddress) { - long nonce = new Random().nextLong(); - while (nonce == 0) { - nonce = new Random().nextLong(); - } - log.trace("addToMapAndGetNonce nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); - nonceMap.put(peerAddress, nonce); - return nonce; - } - - private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) { - log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); - Long nonce = nonceMap.remove(peerAddress); - return nonce != null && nonce == peersNonce; - } - - private void setAuthenticated(Connection connection, Address peerAddress) { - log.info("\n\n############################################################\n" + - "We are authenticated to:" + - "\nconnection=" + connection - + "\nmyAddress=" + getAddress() - + "\npeerAddress= " + peerAddress - + "\n############################################################\n"); - - connection.setAuthenticated(peerAddress, connection); - - Peer peer = new Peer(connection); - addAuthenticatedPeer(peerAddress, peer); - - peerListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); - - log.debug("\n### setAuthenticated post connection " + connection); - } - - private Address getRandomNotConnectedPeerAddress() { - List
list = new ArrayList<>(getNotConnectedPeerAddresses()); - if (list.size() > 0) { - Collections.shuffle(list); - return list.get(0); - } else { - return null; - } + private Set
getReportedNotConnectedPeerAddresses() { + Set
set = new HashSet<>(reportedPeerAddresses); + authenticatedPeers.values().stream().forEach(e -> set.remove(e.address)); + return set; } @@ -551,7 +560,7 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) { - log.debug("Received message " + message + " at " + getAddress() + " from " + connection.getPeerAddress()); + log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress()); if (message instanceof PingMessage) { SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); Futures.addCallback(future, new FutureCallback() { @@ -571,7 +580,7 @@ public class PeerGroup { if (peer != null) { if (((PongMessage) message).nonce != peer.getPingNonce()) { removePeer(peer.address); - log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress()); + log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress()); } } } @@ -582,40 +591,19 @@ public class PeerGroup { // Peers /////////////////////////////////////////////////////////////////////////////////////////// - private void removePeer(@Nullable Address peerAddress) { + void removePeer(@Nullable Address peerAddress) { reportedPeerAddresses.remove(peerAddress); - Peer disconnectedPeer; - disconnectedPeer = authenticatedPeers.remove(peerAddress); + Peer disconnectedPeer = authenticatedPeers.remove(peerAddress); if (disconnectedPeer != null) UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress))); printConnectedPeersMap(); printReportedPeersMap(); - - log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); - nonceMap.remove(peerAddress); } - private void addAuthenticatedPeer(Address address, Peer peer) { - boolean firstPeerAdded; - authenticatedPeers.put(address, peer); - firstPeerAdded = authenticatedPeers.size() == 1; - - UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer))); - - if (firstPeerAdded) - UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstPeerAdded(peer))); - - if (authenticatedPeers.size() > MAX_CONNECTIONS) - disconnectOldConnections(); - - log.trace("addConnectedPeer [post]"); - printConnectedPeersMap(); - } - - private Address getAddress() { + private Address getMyAddress() { return networkNode.getAddress(); } @@ -624,8 +612,12 @@ public class PeerGroup { // Utils /////////////////////////////////////////////////////////////////////////////////////////// + private Address getAndRemoveRandomItem(List
list) { + return list.remove(new Random().nextInt(list.size())); + } + public void printConnectedPeersMap() { - StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":"); + StringBuilder result = new StringBuilder("\nConnected peers for node " + getMyAddress() + ":"); authenticatedPeers.values().stream().forEach(e -> { result.append("\n\t" + e.address); }); @@ -634,16 +626,11 @@ public class PeerGroup { } public void printReportedPeersMap() { - StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":"); + StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getMyAddress() + ":"); reportedPeerAddresses.stream().forEach(e -> { result.append("\n\t" + e); }); result.append("\n"); log.info(result.toString()); } - - private String getObjectId() { - return super.toString().split("@")[1].toString(); - } - } diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java b/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java index fe2b8e4605..b09280412e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/PeerListener.java @@ -4,7 +4,7 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; public interface PeerListener { - void onFirstPeerAdded(Peer peer); + void onFirstAuthenticatePeer(Peer peer); void onPeerAdded(Peer peer); diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java index e5eb02f891..12ec467b81 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/GetPeersMessage.java @@ -3,7 +3,7 @@ package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; -import java.util.ArrayList; +import java.util.HashSet; public final class GetPeersMessage implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. @@ -11,9 +11,9 @@ public final class GetPeersMessage implements AuthenticationMessage { public final Address address; public final long challengerNonce; - public final ArrayList
peerAddresses; + public final HashSet
peerAddresses; - public GetPeersMessage(Address address, long challengerNonce, ArrayList
peerAddresses) { + public GetPeersMessage(Address address, long challengerNonce, HashSet
peerAddresses) { this.address = address; this.challengerNonce = challengerNonce; this.peerAddresses = peerAddresses; diff --git a/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java b/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java index cc687234f7..00fdfb7344 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/messages/PeersMessage.java @@ -3,16 +3,16 @@ package io.bitsquare.p2p.peer.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Address; -import java.util.ArrayList; +import java.util.HashSet; public final class PeersMessage implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; - public final ArrayList
peerAddresses; + public final HashSet
peerAddresses; - public PeersMessage(Address address, ArrayList
peerAddresses) { + public PeersMessage(Address address, HashSet
peerAddresses) { this.address = address; this.peerAddresses = peerAddresses; } diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index 7cbbf6a0d3..48e7175035 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -11,13 +11,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; +import static com.google.common.base.Preconditions.checkArgument; + public class SeedNode { private static final Logger log = LoggerFactory.getLogger(SeedNode.class); - private int port = 8001; + private Address mySeedNodeAddress = new Address("localhost:8001"); private boolean useLocalhost = false; private Set
seedNodes; private P2PService p2PService; @@ -31,30 +35,45 @@ public class SeedNode { // API /////////////////////////////////////////////////////////////////////////////////////////// - // args: port useLocalhost seedNodes - // eg. 4444 true localhost:7777 localhost:8888 + // args: myAddress (incl. port) useLocalhost seedNodes (separated with |) + // 2. and 3. args are optional + // eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003 + // or when using localhost: localhost:8001 true localhost:8002|localhost:8003 public void processArgs(String[] args) { if (args.length > 0) { - port = Integer.parseInt(args[0]); + + String arg0 = args[0]; + checkArgument(arg0.contains(":") && arg0.split(":").length == 2 && arg0.split(":")[1].length() == 4, "Wrong program argument"); + mySeedNodeAddress = new Address(arg0); if (args.length > 1) { - useLocalhost = ("true").equals(args[1]); + String arg1 = args[1]; + checkArgument(arg1.equals("true") || arg1.equals("false")); + useLocalhost = ("true").equals(arg1); - if (args.length > 2) { + if (args.length == 3) { + String arg2 = args[2]; + checkArgument(arg2.contains(":") && arg2.split(":").length > 1 && arg2.split(":")[1].length() > 3, "Wrong program argument"); + List list = Arrays.asList(arg2.split("|")); seedNodes = new HashSet<>(); - for (int i = 2; i < args.length; i++) { - seedNodes.add(new Address(args[i])); - } + list.forEach(e -> { + checkArgument(e.contains(":") && e.split(":").length == 2 && e.split(":")[1].length() == 4, "Wrong program argument"); + seedNodes.add(new Address(e)); + }); + seedNodes.remove(mySeedNodeAddress); + } else { + log.error("Wrong number of program arguments." + + "\nProgram arguments: myAddress useLocalhost seedNodes"); } } } } public void createAndStartP2PService() { - createAndStartP2PService(null, null, port, useLocalhost, seedNodes, null); + createAndStartP2PService(null, null, mySeedNodeAddress, useLocalhost, seedNodes, null); } - public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, int port, boolean useLocalhost, @Nullable Set
seedNodes, @Nullable P2PServiceListener listener) { + public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, Address mySeedNodeAddress, boolean useLocalhost, @Nullable Set
seedNodes, @Nullable P2PServiceListener listener) { SeedNodesRepository seedNodesRepository = new SeedNodesRepository(); if (seedNodes != null && !seedNodes.isEmpty()) { if (useLocalhost) @@ -63,7 +82,8 @@ public class SeedNode { seedNodesRepository.setTorSeedNodeAddresses(seedNodes); } - p2PService = new P2PService(seedNodesRepository, port, new File("bitsquare_seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy")); + p2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, new File("bitsquare_seed_node_" + mySeedNodeAddress.port), useLocalhost, encryptionService, keyRing, new File("dummy")); + p2PService.removeMySeedNodeAddressFromList(mySeedNodeAddress); p2PService.start(listener); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/DataSetMessage.java b/network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java similarity index 79% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/DataSetMessage.java rename to network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java index 5a6ed1d5f3..27f3c93d14 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/DataSetMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/messages/AllDataMessage.java @@ -6,22 +6,22 @@ import io.bitsquare.p2p.storage.data.ProtectedData; import java.util.HashSet; -public final class DataSetMessage implements Message { +public final class AllDataMessage implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final HashSet set; - public DataSetMessage(HashSet set) { + public AllDataMessage(HashSet set) { this.set = set; } @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof DataSetMessage)) return false; + if (!(o instanceof AllDataMessage)) return false; - DataSetMessage that = (DataSetMessage) o; + AllDataMessage that = (AllDataMessage) o; return !(set != null ? !set.equals(that.set) : that.set != null); diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataSetMessage.java b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java similarity index 64% rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataSetMessage.java rename to network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java index 868774c8a6..1371f36bb8 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataSetMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/messages/GetAllDataMessage.java @@ -3,13 +3,10 @@ package io.bitsquare.p2p.storage.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; -public final class GetDataSetMessage implements Message { +public final class GetAllDataMessage implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - public final long nonce; - - public GetDataSetMessage(long nonce) { - this.nonce = nonce; + public GetAllDataMessage() { } } diff --git a/network/src/test/java/io/bitsquare/p2p/TestUtils.java b/network/src/test/java/io/bitsquare/p2p/TestUtils.java index 1b6ef6fd4d..25b7adfd70 100644 --- a/network/src/test/java/io/bitsquare/p2p/TestUtils.java +++ b/network/src/test/java/io/bitsquare/p2p/TestUtils.java @@ -80,7 +80,7 @@ public class TestUtils { } CountDownLatch latch = new CountDownLatch(1); - seedNode.createAndStartP2PService(encryptionService, keyRing, port, useLocalhost, seedNodes, new P2PServiceListener() { + seedNode.createAndStartP2PService(encryptionService, keyRing, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() { @Override public void onAllDataReceived() { } @@ -94,7 +94,7 @@ public class TestUtils { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { latch.countDown(); } @@ -134,7 +134,7 @@ public class TestUtils { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } diff --git a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java index 602f25f3a3..9c4053e614 100644 --- a/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java +++ b/network/src/test/java/io/bitsquare/p2p/network/LocalhostNetworkNodeTest.java @@ -42,7 +42,7 @@ public class LocalhostNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onHiddenServiceReady"); startupLatch.countDown(); } @@ -65,7 +65,7 @@ public class LocalhostNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onHiddenServiceReady 2"); startupLatch.countDown(); } diff --git a/network/src/test/java/io/bitsquare/p2p/network/TorNetworkNodeTest.java b/network/src/test/java/io/bitsquare/p2p/network/TorNetworkNodeTest.java index df7a909721..908e3d1ef7 100644 --- a/network/src/test/java/io/bitsquare/p2p/network/TorNetworkNodeTest.java +++ b/network/src/test/java/io/bitsquare/p2p/network/TorNetworkNodeTest.java @@ -42,7 +42,7 @@ public class TorNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onReadyForReceivingMessages"); latch.countDown(); } @@ -65,7 +65,7 @@ public class TorNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onReadyForReceivingMessages"); } @@ -124,7 +124,7 @@ public class TorNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onReadyForReceivingMessages"); latch.countDown(); } @@ -144,7 +144,7 @@ public class TorNetworkNodeTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { log.debug("onReadyForReceivingMessages"); latch.countDown(); } diff --git a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java index 2497244c84..97957d0024 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java @@ -79,10 +79,11 @@ public class PeerGroupTest { LocalhostNetworkNode.setSimulateTorDelayTorNode(0); LocalhostNetworkNode.setSimulateTorDelayHiddenService(0); seedNodes = new HashSet<>(); - seedNodes.add(new Address("localhost:8001")); + Address address = new Address("localhost:8001"); + seedNodes.add(address); seedNode1 = new SeedNode(); latch = new CountDownLatch(2); - seedNode1.createAndStartP2PService(null, null, 8001, useLocalhost, seedNodes, new P2PServiceListener() { + seedNode1.createAndStartP2PService(null, null, address, useLocalhost, seedNodes, new P2PServiceListener() { @Override public void onAllDataReceived() { latch.countDown(); @@ -98,7 +99,7 @@ public class PeerGroupTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { latch.countDown(); } @@ -118,13 +119,15 @@ public class PeerGroupTest { LocalhostNetworkNode.setSimulateTorDelayTorNode(0); LocalhostNetworkNode.setSimulateTorDelayHiddenService(0); seedNodes = new HashSet<>(); - seedNodes.add(new Address("localhost:8001")); - seedNodes.add(new Address("localhost:8002")); + Address address1 = new Address("localhost:8001"); + seedNodes.add(address1); + Address address2 = new Address("localhost:8002"); + seedNodes.add(address2); latch = new CountDownLatch(6); seedNode1 = new SeedNode(); - seedNode1.createAndStartP2PService(null, null, 8001, useLocalhost, seedNodes, new P2PServiceListener() { + seedNode1.createAndStartP2PService(null, null, address1, useLocalhost, seedNodes, new P2PServiceListener() { @Override public void onAllDataReceived() { latch.countDown(); @@ -141,7 +144,7 @@ public class PeerGroupTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { latch.countDown(); } @@ -155,7 +158,7 @@ public class PeerGroupTest { Thread.sleep(500); seedNode2 = new SeedNode(); - seedNode2.createAndStartP2PService(null, null, 8002, useLocalhost, seedNodes, new P2PServiceListener() { + seedNode2.createAndStartP2PService(null, null, address2, useLocalhost, seedNodes, new P2PServiceListener() { @Override public void onAllDataReceived() { latch.countDown(); @@ -172,7 +175,7 @@ public class PeerGroupTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { latch.countDown(); } @@ -382,7 +385,7 @@ public class PeerGroupTest { SeedNode seedNode = new SeedNode(); latch = new CountDownLatch(1); - seedNode.createAndStartP2PService(null, null, port, useLocalhost, seedNodes, new P2PServiceListener() { + seedNode.createAndStartP2PService(null, null, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() { @Override public void onAllDataReceived() { latch.countDown(); @@ -398,7 +401,7 @@ public class PeerGroupTest { } @Override - public void onHiddenServiceReady() { + public void onHiddenServicePublished() { } diff --git a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 4144e57645..0fa4df9722 100644 --- a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -3,7 +3,6 @@ package io.bitsquare.p2p.seed; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.bitsquare.app.Logging; import io.bitsquare.common.UserThread; -import io.bitsquare.common.util.Utilities; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +11,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.security.NoSuchAlgorithmException; import java.security.Security; -import java.util.Random; import java.util.Scanner; import java.util.Timer; import java.util.concurrent.Executors; @@ -24,8 +22,8 @@ public class SeedNodeMain { private boolean stopped; - // args: port useLocalhost seedNodes - // eg. 4444 true localhost:7777 localhost:8888 + // args: myAddress (incl. port) useLocalhost seedNodes (separated with |) + // eg. lmvdenjkyvx2ovga.onion:8001 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003 // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { Path path = Paths.get("seed_node_log"); @@ -62,8 +60,7 @@ public class SeedNodeMain { if (line.equals("q")) { if (!stopped) { stopped = true; - Timer timeout = Utilities.runTimerTask(() -> { - Thread.currentThread().setName("ShutdownTimeout-" + new Random().nextInt(1000)); + Timer timeout = UserThread.runAfter(() -> { log.error("Timeout occurred at shutDown request"); System.exit(1); }, 10);