From def492a22a36c94775f308a741e52cdaab45cd9d Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 5 Nov 2015 20:55:55 +0100 Subject: [PATCH] Update guava, cleanup threading --- .../io/bitsquare/p2p/seed/SeedNodeMain.java | 54 +++++- .../java/io/bitsquare/common/UserThread.java | 10 +- .../io/bitsquare/common/util/Utilities.java | 62 +++++++ .../io/bitsquare/storage/FileManager.java | 9 +- .../java/io/bitsquare/btc/WalletService.java | 5 +- .../java/io/bitsquare/crypto/ScryptUtil.java | 12 +- .../io/bitsquare/trade/offer/OpenOffer.java | 2 + .../trade/offer/OpenOfferManager.java | 15 +- .../OfferAvailabilityProtocol.java | 2 + .../trade/protocol/trade/TradeProtocol.java | 2 + .../java/io/bitsquare/p2p/P2PService.java | 39 ++--- .../src/main/java/io/bitsquare/p2p/Utils.java | 19 -- .../io/bitsquare/p2p/network/Connection.java | 105 +++++------ .../p2p/network/LocalhostNetworkNode.java | 67 +++---- .../io/bitsquare/p2p/network/NetworkNode.java | 48 ++++-- .../java/io/bitsquare/p2p/network/Server.java | 5 +- .../bitsquare/p2p/network/TorNetworkNode.java | 163 +++++++++--------- .../io/bitsquare/p2p/routing/Routing.java | 131 ++++++-------- .../java/io/bitsquare/p2p/seed/SeedNode.java | 30 +--- .../p2p/storage/ProtectedDataStorageTest.java | 2 +- pom.xml | 2 +- 21 files changed, 423 insertions(+), 361 deletions(-) diff --git a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 5644101fa8..855189fb44 100644 --- a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -2,19 +2,34 @@ package io.bitsquare.p2p.seed; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.security.NoSuchAlgorithmException; import java.security.Security; +import java.util.Random; +import java.util.Scanner; +import java.util.Timer; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class SeedNodeMain { + private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class); + private static SeedNodeMain seedNodeMain; + private SeedNode seedNode; + + private boolean stopped; // args: port useLocalhost seedNodes // eg. 4444 true localhost:7777 localhost:8888 // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { + seedNodeMain = new SeedNodeMain(args); + } + + public SeedNodeMain(String[] args) { Security.addProvider(new BouncyCastleProvider()); final ThreadFactory threadFactory = new ThreadFactoryBuilder() @@ -22,10 +37,41 @@ public class SeedNodeMain { .setDaemon(true) .build(); UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory)); + UserThread.execute(() -> { + try { + seedNode = new SeedNode(); + seedNode.processArgs(args); + seedNode.createAndStartP2PService(); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + }); + listenForExitCommand(); + } - SeedNode seedNode = new SeedNode(); - seedNode.processArgs(args); - seedNode.createAndStartP2PService(); - seedNode.listenForExitCommand(); + public void listenForExitCommand() { + Scanner scan = new Scanner(System.in); + String line; + while (!stopped && ((line = scan.nextLine()) != null)) { + if (line.equals("q")) { + if (!stopped) { + stopped = true; + Timer timeout = Utilities.runTimerTask(() -> { + Thread.currentThread().setName("ShutdownTimeout-" + new Random().nextInt(1000)); + log.error("Timeout occurred at shutDown request"); + System.exit(1); + }, 10); + + if (seedNode != null) { + seedNode.shutDown(() -> { + timeout.cancel(); + log.debug("Shutdown seed node complete."); + System.exit(0); + }); + } + } + } + } } } diff --git a/common/src/main/java/io/bitsquare/common/UserThread.java b/common/src/main/java/io/bitsquare/common/UserThread.java index bce70d270b..7ce9965833 100644 --- a/common/src/main/java/io/bitsquare/common/UserThread.java +++ b/common/src/main/java/io/bitsquare/common/UserThread.java @@ -17,8 +17,9 @@ package io.bitsquare.common; +import com.google.common.util.concurrent.MoreExecutors; + import java.util.concurrent.Executor; -import java.util.concurrent.Executors; public class UserThread { @@ -30,7 +31,12 @@ public class UserThread { UserThread.executor = executor; } - public static Executor executor = Executors.newSingleThreadExecutor(); + static { + // If not defined we use same thread as caller thread + executor = MoreExecutors.directExecutor(); + } + + private static Executor executor; public static void execute(Runnable command) { UserThread.executor.execute(command); diff --git a/common/src/main/java/io/bitsquare/common/util/Utilities.java b/common/src/main/java/io/bitsquare/common/util/Utilities.java index 22bf0ebc9c..2b9149d1f1 100644 --- a/common/src/main/java/io/bitsquare/common/util/Utilities.java +++ b/common/src/main/java/io/bitsquare/common/util/Utilities.java @@ -19,6 +19,9 @@ package io.bitsquare.common.util; import com.google.common.base.Charsets; import com.google.common.io.CharStreams; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.*; import javafx.scene.input.Clipboard; import javafx.scene.input.ClipboardContent; @@ -32,6 +35,13 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLConnection; import java.net.URLEncoder; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** @@ -51,6 +61,58 @@ public class Utilities { return gson.toJson(object); } + public static ListeningExecutorService getListeningExecutorService(String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime) { + return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTime)); + } + + public static ThreadPoolExecutor getThreadPoolExecutor(String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime) { + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(name) + .setDaemon(true) + .build(); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory); + threadPoolExecutor.allowCoreThreadTimeOut(true); + threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> log.warn("RejectedExecutionHandler called")); + return threadPoolExecutor; + } + + public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay) { + return runTimerTaskWithRandomDelay(runnable, minDelay, maxDelay, TimeUnit.SECONDS); + } + + public static Timer runTimerTaskWithRandomDelay(Runnable runnable, long minDelay, long maxDelay, TimeUnit timeUnit) { + return runTimerTask(runnable, new Random().nextInt((int) (maxDelay - minDelay)) + minDelay, timeUnit); + } + + public static Timer runTimerTask(Runnable runnable, long delay) { + return runTimerTask(runnable, delay, TimeUnit.SECONDS); + } + + public static Timer runTimerTask(Runnable runnable, long delay, TimeUnit timeUnit) { + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("TimerTask-" + new Random().nextInt(10000)); + try { + runnable.run(); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing timerTask failed. " + t.getMessage()); + } + } + }, timeUnit.convert(delay, timeUnit)); + return timer; + } + + public static boolean isUnix() { return isOSX() || isLinux() || getOSName().contains("freebsd"); } diff --git a/common/src/main/java/io/bitsquare/storage/FileManager.java b/common/src/main/java/io/bitsquare/storage/FileManager.java index 95db71b132..7d8c218945 100644 --- a/common/src/main/java/io/bitsquare/storage/FileManager.java +++ b/common/src/main/java/io/bitsquare/storage/FileManager.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.nio.file.Paths; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -67,7 +68,7 @@ public class FileManager { private final AtomicBoolean savePending; private final long delay; private final TimeUnit delayTimeUnit; - private final Callable saver; + private final Callable saveFileTask; private T serializable; @@ -88,6 +89,7 @@ public class FileManager { executor = new ScheduledThreadPoolExecutor(1, builder.build()); executor.setKeepAliveTime(5, TimeUnit.SECONDS); executor.allowCoreThreadTimeOut(true); + executor.setMaximumPoolSize(10); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); // File must only be accessed from the auto-save executor from now on, to avoid simultaneous access. @@ -95,7 +97,8 @@ public class FileManager { this.delay = delay; this.delayTimeUnit = checkNotNull(delayTimeUnit); - saver = () -> { + saveFileTask = () -> { + Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000)); // Runs in an auto save thread. if (!savePending.getAndSet(false)) { // Some other scheduled request already beat us to it. @@ -137,7 +140,7 @@ public class FileManager { if (savePending.getAndSet(true)) return; // Already pending. - executor.schedule(saver, delay, delayTimeUnit); + executor.schedule(saveFileTask, delay, delayTimeUnit); } public synchronized T read(File file) { diff --git a/core/src/main/java/io/bitsquare/btc/WalletService.java b/core/src/main/java/io/bitsquare/btc/WalletService.java index 9e4fa9f033..5a4fbe6d5a 100644 --- a/core/src/main/java/io/bitsquare/btc/WalletService.java +++ b/core/src/main/java/io/bitsquare/btc/WalletService.java @@ -121,7 +121,10 @@ public class WalletService { Timer timeoutTimer = FxTimer.runLater( Duration.ofMillis(STARTUP_TIMEOUT), - () -> exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds.")) + () -> { + Thread.currentThread().setName("WalletService:StartupTimeout-" + new Random().nextInt(1000)); + exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds.")); + } ); // If seed is non-null it means we are restoring from backup. diff --git a/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java b/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java index a287d7271f..a16028bf99 100644 --- a/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java +++ b/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java @@ -1,16 +1,14 @@ package io.bitsquare.crypto; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import org.bitcoinj.crypto.KeyCrypterScrypt; import org.bitcoinj.wallet.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spongycastle.crypto.params.KeyParameter; -import java.util.concurrent.*; - //TODO: Borrowed form BitcoinJ/Lighthouse. Remove Protos dependency, check complete code logic. public class ScryptUtil { private static final Logger log = LoggerFactory.getLogger(ScryptUtil.class); @@ -30,13 +28,7 @@ public class ScryptUtil { } public static void deriveKeyWithScrypt(KeyCrypterScrypt keyCrypterScrypt, String password, DeriveKeyResultHandler resultHandler) { - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("Routing-%d") - .setDaemon(true) - .build(); - - ExecutorService executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); - executorService.submit(() -> { + Utilities.getThreadPoolExecutor("ScryptUtil:deriveKeyWithScrypt-%d", 1, 2, 5L).submit(() -> { try { log.info("Doing key derivation"); long start = System.currentTimeMillis(); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java index 241c3f855d..79b19d581c 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.time.Duration; import java.util.Date; +import java.util.Random; public class OpenOffer implements Tradable, Serializable { // That object is saved to disc. We need to take care of changes to not break deserialization. @@ -102,6 +103,7 @@ public class OpenOffer implements Tradable, Serializable { timeoutTimer = FxTimer.runLater( Duration.ofMillis(TIMEOUT), () -> { + Thread.currentThread().setName("OpenOffer:Timeout-" + new Random().nextInt(1000)); log.debug("Timeout reached"); if (state == State.RESERVED) setState(State.AVAILABLE); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 8c62a39d10..0d4a170b00 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -50,8 +50,6 @@ import java.util.Optional; import java.util.Random; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import static com.google.inject.internal.util.$Preconditions.checkNotNull; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -70,7 +68,6 @@ public class OpenOfferManager { private final TradableList openOffers; private final Storage> openOffersStorage; private boolean shutDownRequested; - private ScheduledThreadPoolExecutor executor; private P2PServiceListener p2PServiceListener; private final Timer timer = new Timer(); @@ -181,7 +178,7 @@ public class OpenOfferManager { } private void rePublishOffers() { - log.trace("rePublishOffers"); + if (!openOffers.isEmpty()) log.trace("rePublishOffers"); for (OpenOffer openOffer : openOffers) { offerBookService.addOffer(openOffer.getOffer(), () -> log.debug("Successful added offer to P2P network"), @@ -196,14 +193,8 @@ public class OpenOfferManager { } public void shutDown(Runnable completeHandler) { - if (executor != null) { - executor.shutdown(); - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + if (timer != null) + timer.cancel(); if (!shutDownRequested) { log.debug("shutDown"); diff --git a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java index 612fca53f0..b6eada87dd 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.Random; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -144,6 +145,7 @@ public class OfferAvailabilityProtocol { stopTimeout(); timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> { + Thread.currentThread().setName("OfferAvailabilityProtocol:Timeout-" + new Random().nextInt(1000)); log.warn("Timeout reached"); errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded."); }); diff --git a/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java b/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java index 0ac5eef7d3..0ffbb0a186 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.security.PublicKey; import java.time.Duration; import java.util.Optional; +import java.util.Random; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -126,6 +127,7 @@ public abstract class TradeProtocol { stopTimeout(); timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> { + Thread.currentThread().setName("TradeProtocol:Timeout-" + new Random().nextInt(1000)); log.error("Timeout reached"); trade.setErrorMessage("A timeout occurred."); cleanupTradable(); diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 829facc2b6..4d567214db 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -3,7 +3,6 @@ package io.bitsquare.p2p; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.name.Named; import io.bitsquare.app.ProgramArguments; @@ -12,6 +11,7 @@ import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.PubKeyRing; import io.bitsquare.common.crypto.SealedAndSigned; +import io.bitsquare.common.util.Utilities; import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; @@ -36,7 +36,8 @@ import java.io.File; import java.math.BigInteger; import java.security.PublicKey; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -78,7 +79,6 @@ public class P2PService { private boolean allSeedNodesRequested; private Timer sendGetAllDataMessageTimer; private volatile boolean hiddenServiceReady; - private final ExecutorService executorService; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -102,13 +102,6 @@ public class P2PService { networkStatistics = new NetworkStatistics(); - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("P2PService-%d") - .setDaemon(true) - .build(); - - executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); - init(); } @@ -204,7 +197,7 @@ public class P2PService { networkNode.addMessageListener((message, connection) -> { if (message instanceof GetDataSetMessage) { - log.trace("Received GetAllDataMessage: " + message); + log.trace("Received GetDataSetMessage: " + message); // we only reply if we did not get the message form ourselves (in case we are a seed node) if (!getDataSetMessageNonceList.contains(((GetDataSetMessage) message).nonce)) { @@ -595,19 +588,17 @@ public class P2PService { // we try to connect to 2 seed nodes if (connectedSeedNodes.size() < 2 && !remainingSeedNodeAddresses.isEmpty()) { // give a random pause of 1-3 sec. before using the next - sendGetAllDataMessageTimer = new Timer(); - sendGetAllDataMessageTimer.schedule(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000)); - try { - UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses)); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } + + if (sendGetAllDataMessageTimer != null) sendGetAllDataMessageTimer.cancel(); + sendGetAllDataMessageTimer = Utilities.runTimerTaskWithRandomDelay(() -> { + Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000)); + try { + UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses)); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } - }, new Random().nextInt(2000) + 1000); + }, 1, 3); } else { allSeedNodesRequested = true; } @@ -617,7 +608,7 @@ public class P2PService { public void onFailure(Throwable throwable) { log.info("Send GetAllDataMessage to " + candidate + " failed. Exception:" + throwable.getMessage()); log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses); - sendGetAllDataMessage(remainingSeedNodeAddresses); + UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses)); } }); } else { diff --git a/network/src/main/java/io/bitsquare/p2p/Utils.java b/network/src/main/java/io/bitsquare/p2p/Utils.java index e8727ab50e..1fa9c761e1 100644 --- a/network/src/main/java/io/bitsquare/p2p/Utils.java +++ b/network/src/main/java/io/bitsquare/p2p/Utils.java @@ -8,10 +8,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.net.ServerSocket; -import java.util.List; import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -31,22 +28,6 @@ public class Utils { } } - public static void shutDownExecutorService(ExecutorService executorService) { - shutDownExecutorService(executorService, 200); - } - - public static void shutDownExecutorService(ExecutorService executorService, long waitBeforeShutDown) { - executorService.shutdown(); - try { - boolean done = executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS); - if (!done) log.trace("Not all tasks completed at shutdown."); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - final List rejected = executorService.shutdownNow(); - log.debug("Rejected tasks: {}", rejected.size()); - } - public static byte[] compress(Serializable input) { return compress(ByteArrayUtils.objectToByteArray(input)); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index ae20eda03c..44142ccad2 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -1,6 +1,6 @@ package io.bitsquare.p2p.network; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.common.UserThread; @@ -20,7 +20,10 @@ import java.net.SocketTimeoutException; import java.util.Date; import java.util.Map; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Connection is created by the server thread or by send message from NetworkNode. @@ -33,12 +36,13 @@ public class Connection { private static final int MAX_ILLEGAL_REQUESTS = 5; private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min. private InputHandler inputHandler; + private boolean isAuthenticated; public static int getMaxMsgSize() { return MAX_MSG_SIZE; } - private final int port; + private final String portInfo; private final String uid; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -65,7 +69,7 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { - port = socket.getLocalPort(); + portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort(); uid = UUID.randomUUID().toString(); init(socket, messageListener, connectionListener); @@ -84,7 +88,7 @@ public class Connection { ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); // We create a thread for handling inputStream data - inputHandler = new InputHandler(sharedSpace, objectInputStream, port); + inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo); executorService.submit(inputHandler); } catch (IOException e) { sharedSpace.handleConnectionException(e); @@ -103,14 +107,14 @@ public class Connection { public synchronized void setAuthenticated(Address peerAddress, Connection connection) { this.peerAddress = peerAddress; + isAuthenticated = true; UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection)); } public void sendMessage(Message message) { - // That method we get called form user thread if (!stopped) { try { - log.trace("writeObject " + message + " on connection with port " + port); + log.trace("writeObject " + message + " on connection with port " + portInfo); if (!stopped) { Object objectToWrite; if (useCompression) { @@ -156,7 +160,7 @@ public class Connection { } public synchronized boolean isAuthenticated() { - return peerAddress != null; + return isAuthenticated; } public String getUid() { @@ -204,30 +208,42 @@ public class Connection { UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.SHUT_DOWN, this)); if (sendCloseConnectionMessage) { - sendMessage(new CloseConnectionMessage()); - // give a bit of time for closing gracefully - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } + executorService.submit(() -> { + Thread.currentThread().setName("Connection:Send-CloseConnectionMessage-" + this.getObjectId()); + try { + sendMessage(new CloseConnectionMessage()); + // give a bit of time for closing gracefully + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - try { - sharedSpace.getSocket().close(); - } catch (SocketException e) { - log.trace("SocketException at shutdown might be expected " + e.getMessage()); - } catch (IOException e) { - e.printStackTrace(); - } finally { - Utils.shutDownExecutorService(executorService); - - log.debug("Connection shutdown complete " + this.toString()); - // dont use executorService as its shut down but call handler on own thread - // to not get interrupted by caller - if (shutDownCompleteHandler != null) - new Thread(shutDownCompleteHandler).start(); + UserThread.execute(() -> continueShutDown(shutDownCompleteHandler)); + } catch (Throwable t) { + throw t; + } + }); + } else { + continueShutDown(shutDownCompleteHandler); } } } } + private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) { + try { + sharedSpace.getSocket().close(); + } catch (SocketException e) { + log.trace("SocketException at shutdown might be expected " + e.getMessage()); + } catch (IOException e) { + e.printStackTrace(); + } finally { + MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS); + + log.debug("Connection shutdown complete " + this.toString()); + // dont use executorService as its shut down but call handler on own thread + // to not get interrupted by caller + if (shutDownCompleteHandler != null) + UserThread.execute(shutDownCompleteHandler); + } + } @Override public boolean equals(Object o) { @@ -236,14 +252,14 @@ public class Connection { Connection that = (Connection) o; - if (port != that.port) return false; + if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false; return !(uid != null ? !uid.equals(that.uid) : that.uid != null); } @Override public int hashCode() { - int result = port; + int result = portInfo != null ? portInfo.hashCode() : 0; result = 31 * result + (uid != null ? uid.hashCode() : 0); return result; } @@ -251,7 +267,7 @@ public class Connection { @Override public String toString() { return "Connection{" + - "port=" + port + + "portInfo=" + portInfo + ", uid='" + uid + '\'' + ", objectId='" + getObjectId() + '\'' + ", sharedSpace=" + sharedSpace.toString() + @@ -266,6 +282,10 @@ public class Connection { return super.toString().split("@")[1].toString(); } + public void setPeerAddress(@Nullable Address peerAddress) { + this.peerAddress = peerAddress; + } + /////////////////////////////////////////////////////////////////////////////////////////// // SharedSpace @@ -379,31 +399,23 @@ public class Connection { private final SharedSpace sharedSpace; private final ObjectInputStream objectInputStream; - private final int port; - private final ExecutorService executorService; + private final String portInfo; private volatile boolean stopped; - public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, int port) { + public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) { this.sharedSpace = sharedSpace; this.objectInputStream = objectInputStream; - this.port = port; - - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("InputHandler-onMessage-" + port) - .setDaemon(true) - .build(); - executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + this.portInfo = portInfo; } public void stop() { stopped = true; - Utils.shutDownExecutorService(executorService); } @Override public void run() { try { - Thread.currentThread().setName("InputHandler-" + port); + Thread.currentThread().setName("InputHandler-" + portInfo); while (!stopped && !Thread.currentThread().isInterrupted()) { try { log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId()); @@ -447,14 +459,7 @@ public class Connection { return null; } }; - executorService.submit(() -> { - try { - sharedSpace.onMessage(message); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - }); + sharedSpace.onMessage(message); } } else { sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); @@ -482,7 +487,7 @@ public class Connection { public String toString() { return "InputHandler{" + "sharedSpace=" + sharedSpace + - ", port=" + port + + ", port=" + portInfo + ", stopped=" + stopped + '}'; } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index 63fef5362e..41da5ce63b 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -1,6 +1,9 @@ package io.bitsquare.p2p.network; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import io.bitsquare.common.UserThread; @@ -16,7 +19,7 @@ import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { @@ -47,11 +50,7 @@ public class LocalhostNetworkNode extends NetworkNode { public void start(@Nullable SetupListener setupListener) { if (setupListener != null) addSetupListener(setupListener); - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("NetworkNode-" + port) - .setDaemon(true) - .build(); - executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory)); + createExecutor(); //Tor delay simulation createTorNode(torNode -> { @@ -92,19 +91,22 @@ public class LocalhostNetworkNode extends NetworkNode { /////////////////////////////////////////////////////////////////////////////////////////// private void createTorNode(final Consumer resultHandler) { - Callable> task = () -> { - Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000)); - long ts = System.currentTimeMillis(); - if (simulateTorDelayTorNode > 0) - Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS); + ListenableFuture> future = executorService.submit(() -> { + Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000)); + try { + long ts = System.currentTimeMillis(); + if (simulateTorDelayTorNode > 0) + Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS); - log.info("\n\n############################################################\n" + - "TorNode created [simulation]:" + - "\nTook " + (System.currentTimeMillis() - ts) + " ms" - + "\n############################################################\n"); - return null; - }; - ListenableFuture> future = executorService.submit(task); + log.info("\n\n############################################################\n" + + "TorNode created [simulation]:" + + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + + "\n############################################################\n"); + return null; + } catch (Throwable t) { + throw t; + } + }); Futures.addCallback(future, new FutureCallback>() { public void onSuccess(TorNode torNode) { UserThread.execute(() -> resultHandler.accept(torNode)); @@ -117,19 +119,22 @@ public class LocalhostNetworkNode extends NetworkNode { } private void createHiddenService(final Consumer resultHandler) { - Callable task = () -> { - Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000)); - long ts = System.currentTimeMillis(); - if (simulateTorDelayHiddenService > 0) - Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS); + ListenableFuture future = executorService.submit(() -> { + Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000)); + try { + long ts = System.currentTimeMillis(); + if (simulateTorDelayHiddenService > 0) + Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS); - log.info("\n\n############################################################\n" + - "Hidden service created [simulation]:" + - "\nTook " + (System.currentTimeMillis() - ts) + " ms" - + "\n############################################################\n"); - return null; - }; - ListenableFuture future = executorService.submit(task); + log.info("\n\n############################################################\n" + + "Hidden service created [simulation]:" + + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + + "\n############################################################\n"); + return null; + } catch (Throwable t) { + throw t; + } + }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor)); diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 50948f83b1..fb85b74675 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -2,6 +2,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.*; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import org.jetbrains.annotations.NotNull; @@ -10,11 +11,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.ConnectException; import java.net.ServerSocket; import java.net.Socket; -import java.util.*; -import java.util.concurrent.Callable; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import static com.google.common.base.Preconditions.checkNotNull; @@ -22,8 +27,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); protected final int port; - private final Set outBoundConnections = Collections.synchronizedSet(new HashSet<>()); - private final Set inBoundConnections = Collections.synchronizedSet(new HashSet<>()); + private final Set outBoundConnections = new CopyOnWriteArraySet<>(); + private final Set inBoundConnections = new CopyOnWriteArraySet<>(); private final List messageListeners = new CopyOnWriteArrayList<>(); private final List connectionListeners = new CopyOnWriteArrayList<>(); protected final List setupListeners = new CopyOnWriteArrayList<>(); @@ -76,15 +81,16 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener return sendMessage(connection, message); } else { final SettableFuture resultFuture = SettableFuture.create(); - Callable task = () -> { - Connection newConnection; + ListenableFuture future = executorService.submit(() -> { + Thread.currentThread().setName("NetworkNode:SendMessage-create-new-outbound-connection-to-" + peerAddress); try { - Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress); + Connection newConnection; log.trace("We have not found any connection for that peerAddress. " + "We will create a new outbound connection."); try { Socket socket = getSocket(peerAddress); // can take a while when using tor newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this); + newConnection.setPeerAddress(peerAddress); outBoundConnections.add(newConnection); log.info("\n\nNetworkNode created new outbound connection:" @@ -93,28 +99,26 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener + "\nmessage=" + message + "\n\n"); } catch (Throwable t) { - UserThread.execute(() -> resultFuture.setException(t)); - return null; + throw t; } newConnection.sendMessage(message); return newConnection; } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - UserThread.execute(() -> resultFuture.setException(t)); throw t; } - }; - - ListenableFuture future = executorService.submit(task); + }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { UserThread.execute(() -> resultFuture.set(connection)); } public void onFailure(@NotNull Throwable throwable) { + if (!(throwable instanceof ConnectException)) { + throwable.printStackTrace(); + log.error("Executing task failed. " + throwable.getMessage()); + } UserThread.execute(() -> resultFuture.setException(throwable)); } }); @@ -123,9 +127,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } public SettableFuture sendMessage(Connection connection, Message message) { + // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block ListenableFuture future = executorService.submit(() -> { - connection.sendMessage(message); - return connection; + Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId()); + try { + connection.sendMessage(message); + return connection; + } catch (Throwable t) { + throw t; + } }); final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(future, new FutureCallback() { @@ -234,6 +244,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // Protected /////////////////////////////////////////////////////////////////////////////////////////// + protected void createExecutor() { + executorService = Utilities.getListeningExecutorService("NetworkNode-" + port, 20, 50, 120L); + } + protected void startServer(ServerSocket serverSocket) { server = new Server(serverSocket, (message, connection) -> NetworkNode.this.onMessage(message, connection), diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 98faf23ed3..9f1e76eb5a 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -29,13 +29,14 @@ public class Server implements Runnable { @Override public void run() { try { - Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); + // Thread created by NetworkNode + Thread.currentThread().setName("NetworkNode:Server-" + serverSocket.getLocalPort()); try { while (!stopped && !Thread.currentThread().isInterrupted()) { log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); final Socket socket = serverSocket.accept(); if (!stopped) { - log.info("Accepted new client on port " + socket.getLocalPort()); + log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort()); Connection connection = new Connection(socket, messageListener, connectionListener); log.info("\n\nServer created new inbound connection:" + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index a506d35bc2..4b4a15f1f2 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -1,11 +1,14 @@ package io.bitsquare.p2p.network; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.Utils; import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.TorNode; import org.jetbrains.annotations.Nullable; @@ -17,12 +20,10 @@ import java.io.IOException; import java.net.Socket; import java.util.Random; import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; public class TorNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class); @@ -65,12 +66,7 @@ public class TorNetworkNode extends NetworkNode { if (setupListener != null) addSetupListener(setupListener); - // executorService might have been shutdown before a restart, so we create a new one - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("NetworkNode-" + port) - .setDaemon(true) - .build(); - executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory)); + createExecutor(); // Create the tor node (takes about 6 sec.) createTorNode(torDir, torNode -> { @@ -83,9 +79,12 @@ public class TorNetworkNode extends NetworkNode { TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; startServer(hiddenServiceDescriptor.getServerSocket()); - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - - setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady())); + Runnable task = () -> { + Thread.currentThread().setName("DelayNotifySetupListenersTimer-" + new Random().nextInt(1000)); + setupListeners.stream() + .forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady())); + }; + Utilities.runTimerTask(task, 500, TimeUnit.MILLISECONDS); }); }); } @@ -102,32 +101,32 @@ public class TorNetworkNode extends NetworkNode { public void shutDown(Runnable shutDownCompleteHandler) { log.info("Shutdown TorNetworkNode"); this.shutDownCompleteHandler = shutDownCompleteHandler; - checkNotNull(executorService, "executorService must not be null"); - shutDownTimeoutTimer = new Timer(); - shutDownTimeoutTimer.schedule(new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000)); - log.error("A timeout occurred at shutDown"); - shutDownExecutorService(); - } - }, SHUT_DOWN_TIMEOUT); + shutDownTimeoutTimer = Utilities.runTimerTask(() -> { + Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000)); + log.error("A timeout occurred at shutDown"); + shutDownExecutorService(); + }, SHUT_DOWN_TIMEOUT, TimeUnit.DAYS.MILLISECONDS); - executorService.submit(() -> super.shutDown(() -> { - networkNodeShutDownDoneComplete = true; - if (torShutDownComplete) - shutDownExecutorService(); - })); + if (executorService != null) { + executorService.submit(() -> super.shutDown(() -> { + networkNodeShutDownDoneComplete = true; + if (torShutDownComplete) + shutDownExecutorService(); + } + )); + } else { + log.error("executorService must not be null at shutDown"); + } ListenableFuture future2 = executorService.submit(() -> { - long ts = System.currentTimeMillis(); - log.info("Shutdown torNode"); try { + long ts = System.currentTimeMillis(); + log.info("Shutdown torNode"); if (torNode != null) torNode.shutdown(); log.info("Shutdown torNode done after " + (System.currentTimeMillis() - ts) + " ms."); - } catch (IOException e) { + } catch (Throwable e) { e.printStackTrace(); log.error("Shutdown torNode failed with exception: " + e.getMessage()); shutDownExecutorService(); @@ -156,36 +155,32 @@ public class TorNetworkNode extends NetworkNode { private void shutDownExecutorService() { shutDownTimeoutTimer.cancel(); - ListenableFuture future = executorService.submit(() -> { - long ts = System.currentTimeMillis(); - log.info("Shutdown executorService"); - Utils.shutDownExecutorService(executorService); - log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms."); - }); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Object o) { + new Thread(() -> { + Thread.currentThread().setName("NetworkNode:shutDownExecutorService-" + new Random().nextInt(1000)); + try { + long ts = System.currentTimeMillis(); + log.info("Shutdown executorService"); + MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS); + log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms."); + log.info("Shutdown completed"); UserThread.execute(() -> shutDownCompleteHandler.run()); - } - - @Override - public void onFailure(Throwable throwable) { - throwable.printStackTrace(); - log.error("Shutdown executorService failed with exception: " + throwable.getMessage()); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Shutdown executorService failed with exception: " + t.getMessage()); UserThread.execute(() -> shutDownCompleteHandler.run()); } - }); + }).start(); } private void restartTor() { restartCounter++; if (restartCounter <= MAX_RESTART_ATTEMPTS) { - shutDown(() -> { - Uninterruptibles.sleepUninterruptibly(WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS); - log.warn("We restart tor as too many self tests failed."); + shutDown(() -> Utilities.runTimerTask(() -> { + Thread.currentThread().setName("RestartTorTimer-" + new Random().nextInt(1000)); + log.warn("We restart tor as starting tor failed."); start(null); - }); + }, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS)); } else { log.error("We tried to restart tor " + restartCounter + " times, but we failed to get tor running. We give up now."); @@ -197,23 +192,26 @@ public class TorNetworkNode extends NetworkNode { /////////////////////////////////////////////////////////////////////////////////////////// private void createTorNode(final File torDir, final Consumer resultHandler) { - Callable> task = () -> { - Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000)); - long ts = System.currentTimeMillis(); - if (torDir.mkdirs()) - log.trace("Created directory for tor"); + ListenableFuture> future = executorService.submit(() -> { + Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000)); + try { + long ts = System.currentTimeMillis(); + if (torDir.mkdirs()) + log.trace("Created directory for tor"); - log.trace("Create TorNode"); - TorNode torNode1 = new TorNode( - torDir) { - }; - log.info("\n\n############################################################\n" + - "TorNode created:" + - "\nTook " + (System.currentTimeMillis() - ts) + " ms" - + "\n############################################################\n"); - return torNode1; - }; - ListenableFuture> future = executorService.submit(task); + log.trace("Create TorNode"); + TorNode torNode1 = new TorNode( + torDir) { + }; + log.info("\n\n############################################################\n" + + "TorNode created:" + + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + + "\n############################################################\n"); + return torNode1; + } catch (Throwable t) { + throw t; + } + }); Futures.addCallback(future, new FutureCallback>() { public void onSuccess(TorNode torNode) { resultHandler.accept(torNode); @@ -228,20 +226,23 @@ public class TorNetworkNode extends NetworkNode { private void createHiddenService(final TorNode torNode, final int port, final Consumer resultHandler) { - Callable task = () -> { - Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000)); - long ts = System.currentTimeMillis(); - log.debug("Create hidden service"); - HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port); - log.info("\n\n############################################################\n" + - "Hidden service created:" + - "\nAddress=" + hiddenServiceDescriptor.getFullAddress() + - "\nTook " + (System.currentTimeMillis() - ts) + " ms" - + "\n############################################################\n"); + ListenableFuture future = executorService.submit(() -> { + Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000)); + try { + long ts = System.currentTimeMillis(); + log.debug("Create hidden service"); + HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port); + log.info("\n\n############################################################\n" + + "Hidden service created:" + + "\nAddress=" + hiddenServiceDescriptor.getFullAddress() + + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + + "\n############################################################\n"); - return hiddenServiceDescriptor; - }; - ListenableFuture future = executorService.submit(task); + return hiddenServiceDescriptor; + } catch (Throwable t) { + throw t; + } + }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { resultHandler.accept(hiddenServiceDescriptor); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java index b711c8bc3f..e4190ebeff 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java @@ -1,9 +1,11 @@ package io.bitsquare.p2p.routing; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; -import io.bitsquare.p2p.Utils; import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.routing.messages.*; import io.bitsquare.p2p.storage.messages.BroadcastMessage; @@ -13,7 +15,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Routing { @@ -39,10 +44,9 @@ public class Routing { private final Map nonceMap = new ConcurrentHashMap<>(); private final List routingListeners = new CopyOnWriteArrayList<>(); private final Map authenticatedPeers = new ConcurrentHashMap<>(); - private final Set
reportedPeerAddresses = Collections.synchronizedSet(new HashSet<>()); + private final Set
reportedPeerAddresses = new CopyOnWriteArraySet<>(); private final Map authenticationCompleteHandlers = new ConcurrentHashMap<>(); private final Timer maintenanceTimer = new Timer(); - private final ExecutorService executorService; private volatile boolean shutDownInProgress; @@ -56,13 +60,6 @@ public class Routing { // We copy it as we remove ourselves later from the list if we are a seed node this.seedNodes = new CopyOnWriteArrayList<>(seeds); - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("Routing-%d") - .setDaemon(true) - .build(); - - executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); - init(networkNode); } @@ -116,7 +113,7 @@ public class Routing { maintenanceTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - Thread.currentThread().setName("RoutingMaintenanceTimer-" + new Random().nextInt(1000)); + Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000)); try { UserThread.execute(() -> { disconnectOldConnections(); @@ -139,8 +136,11 @@ public class Routing { log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); Connection connection = authenticatedConnections.remove(0); log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - connection.shutDown(() -> disconnectOldConnections()); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + connection.shutDown(() -> Utilities.runTimerTask(() -> { + Thread.currentThread().setName("DelayDisconnectOldConnectionsTimer-" + new Random().nextInt(1000)); + disconnectOldConnections(); + }, 1, TimeUnit.MILLISECONDS)); } } @@ -149,7 +149,8 @@ public class Routing { List connectedPeersList = new ArrayList<>(authenticatedPeers.values()); connectedPeersList.stream() .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) - .forEach(e -> { + .forEach(e -> Utilities.runTimerTaskWithRandomDelay(() -> { + Thread.currentThread().setName("DelayPingPeersTimer-" + new Random().nextInt(1000)); SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); Futures.addCallback(future, new FutureCallback() { @Override @@ -163,8 +164,7 @@ public class Routing { removePeer(e.address); } }); - Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS); - }); + }, 5, 10)); } @@ -177,8 +177,6 @@ public class Routing { shutDownInProgress = true; if (maintenanceTimer != null) maintenanceTimer.cancel(); - - Utils.shutDownExecutorService(executorService); } } @@ -253,16 +251,7 @@ public class Routing { public void startAuthentication(Set
connectedSeedNodes) { connectedSeedNodes.forEach(connectedSeedNode -> { - executorService.submit(() -> { - try { - sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); - // give a random pause of 3-5 sec. before using the next - Uninterruptibles.sleepUninterruptibly(new Random().nextInt(2000) + 3000, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - }); + sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); }); } @@ -319,44 +308,40 @@ public class Routing { RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message; Address peerAddress = requestAuthenticationMessage.address; log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); - connection.shutDown(() -> { - // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to - // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - if (simulateAuthTorNode > 0) - Uninterruptibles.sleepUninterruptibly(simulateAuthTorNode, TimeUnit.MILLISECONDS); - - log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); - long nonce = addToMapAndGetNonce(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.debug("onSuccess "); - - // TODO check nr. of connections, remove older connections (?) - } - - @Override - public void onFailure(Throwable throwable) { - log.debug("onFailure "); - // TODO skip to next node or retry? + connection.shutDown(() -> Utilities.runTimerTask(() -> { + Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000)); + // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to + // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) + log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); + long nonce = addToMapAndGetNonce(peerAddress); SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.debug("onSuccess "); + log.debug("onSuccess sending ChallengeMessage"); } @Override public void onFailure(Throwable throwable) { - log.debug("onFailure "); + log.warn("onFailure sending ChallengeMessage. We try again."); + SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.debug("onSuccess sending 2. ChallengeMessage"); + } + + @Override + public void onFailure(Throwable throwable) { + log.warn("onFailure sending ChallengeMessage. We give up."); + } + }); } }); - } - }); - }); + }, + 100 + simulateAuthTorNode, + TimeUnit.MILLISECONDS)); } else if (message instanceof ChallengeMessage) { ChallengeMessage challengeMessage = (ChallengeMessage) message; Address peerAddress = challengeMessage.address; @@ -364,16 +349,13 @@ public class Routing { HashMap tempNonceMap = new HashMap<>(nonceMap); boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress); if (verified) { + connection.setPeerAddress(peerAddress); SettableFuture future = networkNode.sendMessage(peerAddress, new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress); - - /* // we wait to get the success to reduce the time span of the moment of - // authentication at both sides of the connection - setAuthenticated(connection, peerAddress);*/ } @Override @@ -480,25 +462,20 @@ public class Routing { } private void authenticateToNextRandomPeer() { - executorService.submit(() -> { - try { - Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS); - if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { - Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress(); - if (randomNotConnectedPeerAddress != null) { - log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress); - authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer()); - } else { - log.info("No more peers available for connecting."); - } + Utilities.runTimerTaskWithRandomDelay(() -> { + Thread.currentThread().setName("DelayAuthenticateToNextRandomPeerTimer-" + new Random().nextInt(1000)); + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress(); + if (randomNotConnectedPeerAddress != null) { + log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress); + authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer()); } else { - log.info("We have already enough connections."); + log.info("No more peers available for connecting."); } - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); + } else { + log.info("We have already enough connections."); } - }); + }, 200, 400, TimeUnit.MILLISECONDS); } public void authenticateToPeer(Address address, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) { diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index cdeae353cc..84e8d159f1 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -1,5 +1,6 @@ package io.bitsquare.p2p.seed; +import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.crypto.EncryptionService; import io.bitsquare.p2p.Address; @@ -10,7 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.List; public class SeedNode { private static final Logger log = LoggerFactory.getLogger(SeedNode.class); @@ -31,7 +33,6 @@ public class SeedNode { // args: port useLocalhost seedNodes // eg. 4444 true localhost:7777 localhost:8888 - // To stop enter: q public void processArgs(String[] args) { if (args.length > 0) { port = Integer.parseInt(args[0]); @@ -49,29 +50,6 @@ public class SeedNode { } } - public void listenForExitCommand() { - Scanner scan = new Scanner(System.in); - String line; - while (!stopped && ((line = scan.nextLine()) != null)) { - if (line.equals("q")) { - Timer timeout = new Timer(); - timeout.schedule(new TimerTask() { - @Override - public void run() { - log.error("Timeout occurred at shutDown request"); - System.exit(1); - } - }, 10 * 1000); - - shutDown(() -> { - timeout.cancel(); - log.debug("Shutdown seed node complete."); - System.exit(0); - }); - } - } - } - public void createAndStartP2PService() { createAndStartP2PService(null, null, port, useLocalhost, seedNodes, null); } @@ -103,7 +81,7 @@ public class SeedNode { stopped = true; p2PService.shutDown(() -> { - if (shutDownCompleteHandler != null) new Thread(shutDownCompleteHandler).start(); + if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler); }); } } diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index 9d21bc21ca..e691ef5f89 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -56,7 +56,7 @@ public class ProtectedDataStorageTest { dir2.delete(); dir2.mkdir(); - UserThread.executor = Executors.newSingleThreadExecutor(); + UserThread.setExecutor(Executors.newSingleThreadExecutor()); ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10 * 60 * 1000; keyRing1 = new KeyRing(new KeyStorage(dir1)); diff --git a/pom.xml b/pom.xml index 4507f996f8..f4bb4a7e23 100755 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ com.google.guava guava - 16.0.1 + 18.0