diff --git a/common/src/main/java/io/bitsquare/storage/FileManager.java b/common/src/main/java/io/bitsquare/storage/FileManager.java index 415a13d44f..95db71b132 100644 --- a/common/src/main/java/io/bitsquare/storage/FileManager.java +++ b/common/src/main/java/io/bitsquare/storage/FileManager.java @@ -39,7 +39,6 @@ import com.google.common.io.Files; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.bitsquare.common.UserThread; import org.bitcoinj.core.Utils; -import org.bitcoinj.utils.Threading; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +48,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkNotNull; @@ -62,8 +60,6 @@ import static com.google.common.base.Preconditions.checkNotNull; */ public class FileManager { private static final Logger log = LoggerFactory.getLogger(FileManager.class); - private static final ReentrantLock lock = Threading.lock("FileManager"); - private static Thread.UncaughtExceptionHandler uncaughtExceptionHandler; private final File dir; private final File storageFile; @@ -85,7 +81,7 @@ public class FileManager { ThreadFactoryBuilder builder = new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("FileManager thread") + .setNameFormat("FileManager-%d") .setPriority(Thread.MIN_PRIORITY); // Avoid competing with the GUI thread. // An executor that starts up threads when needed and shuts them down later. @@ -144,40 +140,32 @@ public class FileManager { executor.schedule(saver, delay, delayTimeUnit); } - public T read(File file) { + public synchronized T read(File file) { log.debug("read" + file); - lock.lock(); try (final FileInputStream fileInputStream = new FileInputStream(file); final ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)) { return (T) objectInputStream.readObject(); } catch (Throwable t) { log.error("Exception at read: " + t.getMessage()); return null; - } finally { - lock.unlock(); } } - public void removeFile(String fileName) { + public synchronized void removeFile(String fileName) { log.debug("removeFile" + fileName); File file = new File(dir, fileName); - lock.lock(); - try { - boolean result = file.delete(); - if (!result) - log.warn("Could not delete file: " + file.toString()); + boolean result = file.delete(); + if (!result) + log.warn("Could not delete file: " + file.toString()); - File backupDir = new File(Paths.get(dir.getAbsolutePath(), "backup").toString()); - if (backupDir.exists()) { - File backupFile = new File(Paths.get(dir.getAbsolutePath(), "backup", fileName).toString()); - if (backupFile.exists()) { - result = backupFile.delete(); - if (!result) - log.warn("Could not delete backupFile: " + file.toString()); - } + File backupDir = new File(Paths.get(dir.getAbsolutePath(), "backup").toString()); + if (backupDir.exists()) { + File backupFile = new File(Paths.get(dir.getAbsolutePath(), "backup", fileName).toString()); + if (backupFile.exists()) { + result = backupFile.delete(); + if (!result) + log.warn("Could not delete backupFile: " + file.toString()); } - } finally { - lock.unlock(); } } @@ -200,34 +188,24 @@ public class FileManager { } } - public void removeAndBackupFile(String fileName) throws IOException { - lock.lock(); - try { - File corruptedBackupDir = new File(Paths.get(dir.getAbsolutePath(), "corrupted").toString()); - if (!corruptedBackupDir.exists()) - if (!corruptedBackupDir.mkdir()) - log.warn("make dir failed"); + public synchronized void removeAndBackupFile(String fileName) throws IOException { + File corruptedBackupDir = new File(Paths.get(dir.getAbsolutePath(), "corrupted").toString()); + if (!corruptedBackupDir.exists()) + if (!corruptedBackupDir.mkdir()) + log.warn("make dir failed"); - File corruptedFile = new File(Paths.get(dir.getAbsolutePath(), "corrupted", fileName).toString()); - renameTempFileToFile(storageFile, corruptedFile); - } finally { - lock.unlock(); - } + File corruptedFile = new File(Paths.get(dir.getAbsolutePath(), "corrupted", fileName).toString()); + renameTempFileToFile(storageFile, corruptedFile); } - public void backupFile(String fileName) throws IOException { - lock.lock(); - try { - File backupDir = new File(Paths.get(dir.getAbsolutePath(), "backup").toString()); - if (!backupDir.exists()) - if (!backupDir.mkdir()) - log.warn("make dir failed"); + public synchronized void backupFile(String fileName) throws IOException { + File backupDir = new File(Paths.get(dir.getAbsolutePath(), "backup").toString()); + if (!backupDir.exists()) + if (!backupDir.mkdir()) + log.warn("make dir failed"); - File backupFile = new File(Paths.get(dir.getAbsolutePath(), "backup", fileName).toString()); - Files.copy(storageFile, backupFile); - } finally { - lock.unlock(); - } + File backupFile = new File(Paths.get(dir.getAbsolutePath(), "backup", fileName).toString()); + Files.copy(storageFile, backupFile); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -240,8 +218,7 @@ public class FileManager { UserThread.execute(() -> log.info("Save {} completed in {}msec", storageFile, System.currentTimeMillis() - now)); } - private void saveToFile(T serializable, File dir, File storageFile) { - lock.lock(); + private synchronized void saveToFile(T serializable, File dir, File storageFile) { File tempFile = null; FileOutputStream fileOutputStream = null; ObjectOutputStream objectOutputStream = null; @@ -292,27 +269,21 @@ public class FileManager { e.printStackTrace(); log.error("Cannot close resources." + e.getMessage()); } - lock.unlock(); } } - private void renameTempFileToFile(File tempFile, File file) throws IOException { - lock.lock(); - try { - if (Utils.isWindows()) { - // Work around an issue on Windows whereby you can't rename over existing files. - final File canonical = file.getCanonicalFile(); - if (canonical.exists() && !canonical.delete()) { - throw new IOException("Failed to delete canonical file for replacement with save"); - } - if (!tempFile.renameTo(canonical)) { - throw new IOException("Failed to rename " + tempFile + " to " + canonical); - } - } else if (!tempFile.renameTo(file)) { - throw new IOException("Failed to rename " + tempFile + " to " + file); + private synchronized void renameTempFileToFile(File tempFile, File file) throws IOException { + if (Utils.isWindows()) { + // Work around an issue on Windows whereby you can't rename over existing files. + final File canonical = file.getCanonicalFile(); + if (canonical.exists() && !canonical.delete()) { + throw new IOException("Failed to delete canonical file for replacement with save"); } - } finally { - lock.unlock(); + if (!tempFile.renameTo(canonical)) { + throw new IOException("Failed to rename " + tempFile + " to " + canonical); + } + } else if (!tempFile.renameTo(file)) { + throw new IOException("Failed to rename " + tempFile + " to " + file); } } } diff --git a/core/src/main/java/io/bitsquare/btc/WalletService.java b/core/src/main/java/io/bitsquare/btc/WalletService.java index 3b549e1244..9e4fa9f033 100644 --- a/core/src/main/java/io/bitsquare/btc/WalletService.java +++ b/core/src/main/java/io/bitsquare/btc/WalletService.java @@ -245,9 +245,18 @@ public class WalletService { } public void restoreSeedWords(DeterministicSeed seed, ResultHandler resultHandler, ExceptionHandler exceptionHandler) { - walletAppKit.stopAsync(); - walletAppKit.awaitTerminated(); - initialize(seed, resultHandler, exceptionHandler); + Context ctx = Context.get(); + new Thread(() -> { + try { + Context.propagate(ctx); + walletAppKit.stopAsync(); + walletAppKit.awaitTerminated(); + initialize(seed, resultHandler, exceptionHandler); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + }, "RestoreWallet-%d").start(); } diff --git a/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java b/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java index 21adecae61..a287d7271f 100644 --- a/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java +++ b/core/src/main/java/io/bitsquare/crypto/ScryptUtil.java @@ -1,5 +1,6 @@ package io.bitsquare.crypto; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import io.bitsquare.common.UserThread; import org.bitcoinj.crypto.KeyCrypterScrypt; @@ -8,6 +9,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spongycastle.crypto.params.KeyParameter; +import java.util.concurrent.*; + //TODO: Borrowed form BitcoinJ/Lighthouse. Remove Protos dependency, check complete code logic. public class ScryptUtil { private static final Logger log = LoggerFactory.getLogger(ScryptUtil.class); @@ -27,14 +30,31 @@ public class ScryptUtil { } public static void deriveKeyWithScrypt(KeyCrypterScrypt keyCrypterScrypt, String password, DeriveKeyResultHandler resultHandler) { - new Thread(() -> { - log.info("Doing key derivation"); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Routing-%d") + .setDaemon(true) + .build(); - long start = System.currentTimeMillis(); - KeyParameter aesKey = keyCrypterScrypt.deriveKey(password); - long duration = System.currentTimeMillis() - start; - log.info("Key derivation took {} msec", duration); - UserThread.execute(() -> resultHandler.handleResult(aesKey)); - }).start(); + ExecutorService executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + executorService.submit(() -> { + try { + log.info("Doing key derivation"); + long start = System.currentTimeMillis(); + KeyParameter aesKey = keyCrypterScrypt.deriveKey(password); + long duration = System.currentTimeMillis() - start; + log.info("Key derivation took {} msec", duration); + UserThread.execute(() -> { + try { + resultHandler.handleResult(aesKey); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + }); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + }); } } diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index f383b00bb1..342205da29 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -17,7 +17,6 @@ package io.bitsquare.trade.offer; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import io.bitsquare.btc.TradeWalletService; import io.bitsquare.btc.WalletService; @@ -47,10 +46,11 @@ import javax.inject.Named; import java.io.File; import java.time.Duration; import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.inject.internal.util.$Preconditions.checkNotNull; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -70,7 +70,7 @@ public class OpenOfferManager { private boolean shutDownRequested; private ScheduledThreadPoolExecutor executor; private P2PServiceListener p2PServiceListener; - + private final Timer timer = new Timer(); /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -96,9 +96,13 @@ public class OpenOfferManager { openOffersStorage = new Storage<>(storageDir); this.openOffers = new TradableList<>(openOffersStorage, "OpenOffers"); + init(); + } + + private void init() { // In case the app did get killed the shutDown from the modules is not called, so we use a shutdown hook - Thread shutDownHookThread = new Thread(OpenOfferManager.this::shutDown, "OpenOfferManager.ShutDownHook"); - Runtime.getRuntime().addShutdownHook(shutDownHookThread); + Runtime.getRuntime().addShutdownHook(new Thread(OpenOfferManager.this::shutDown, + "OpenOfferManager.ShutDownHook")); // Handler for incoming offer availability requests p2PService.addDecryptedMailListener((decryptedMessageWithPubKey, peerAddress) -> { @@ -155,25 +159,27 @@ public class OpenOfferManager { } private void startRePublishThread() { - if (p2PServiceListener != null) p2PService.removeP2PServiceListener(p2PServiceListener); + if (p2PServiceListener != null) + p2PService.removeP2PServiceListener(p2PServiceListener); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Re-publish offers thread") - .setPriority(Thread.MIN_PRIORITY); // Avoid competing with the GUI thread. - - // An executor that starts up threads when needed and shuts them down later. - executor = new ScheduledThreadPoolExecutor(1, builder.build()); - executor.setKeepAliveTime(5, TimeUnit.SECONDS); - executor.allowCoreThreadTimeOut(true); - executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - - checkArgument(Offer.TTL > 120000, "Offer.TTL <= 120"); - long period = Offer.TTL - 120000; // 2 min before TTL expires - executor.scheduleAtFixedRate(this::rePublishOffers, 500, period, TimeUnit.MILLISECONDS); + long period = (long) (Offer.TTL * 0.8); + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("RepublishOffers-%d"); + rePublishOffers(); + try { + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }; + timer.scheduleAtFixedRate(timerTask, 500, period); } private void rePublishOffers() { + log.trace("rePublishOffers"); for (OpenOffer openOffer : openOffers) { offerBookService.addOffer(openOffer.getOffer(), () -> log.debug("Successful added offer to P2P network"), diff --git a/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/shared/SetupPayoutTxLockTimeReachedListener.java b/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/shared/SetupPayoutTxLockTimeReachedListener.java index 42c0f390aa..fe1b03b83d 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/shared/SetupPayoutTxLockTimeReachedListener.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/shared/SetupPayoutTxLockTimeReachedListener.java @@ -51,9 +51,10 @@ public class SetupPayoutTxLockTimeReachedListener extends TradeTask { () -> { try { log.debug("Block height reached " + blockHeightFuture.get().getHeight()); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + e.printStackTrace(); } broadcastTx(); }, diff --git a/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java b/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java index e9efa00070..be5c4d3203 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/account/content/seedwords/SeedWordsView.java @@ -33,7 +33,6 @@ import javafx.scene.control.Button; import javafx.scene.control.DatePicker; import javafx.scene.control.TextArea; import javafx.scene.layout.GridPane; -import org.bitcoinj.core.Context; import org.bitcoinj.core.Wallet; import org.bitcoinj.crypto.KeyCrypter; import org.bitcoinj.crypto.MnemonicCode; @@ -198,31 +197,27 @@ public class SeedWordsView extends ActivatableView { log.info("Attempting wallet restore using seed '{}' from date {}", seedWordsTextArea.getText(), datePicker.getValue()); long date = datePicker.getValue().atStartOfDay().toEpochSecond(ZoneOffset.UTC); DeterministicSeed seed = new DeterministicSeed(Splitter.on(" ").splitToList(seedWordsTextArea.getText()), null, "", date); - Context ctx = Context.get(); - new Thread(() -> { - Context.propagate(ctx); - walletService.restoreSeedWords(seed, - () -> UserThread.execute(() -> { - log.debug("Wallet restored with seed words"); + walletService.restoreSeedWords(seed, + () -> UserThread.execute(() -> { + log.debug("Wallet restored with seed words"); - new Popup() - .information("Wallet restored successfully with the new seed words.\n\n" + - "You need to restart now the application.") - .closeButtonText("Restart") - .onClose(() -> BitsquareApp.restartDownHandler.run()).show(); - }), - throwable -> UserThread.execute(() -> { - log.error(throwable.getMessage()); - new Popup() - .headLine("Wrong password") - .warning("Please try entering your password again, carefully checking for typos or spelling errors.") - .show(); + new Popup() + .information("Wallet restored successfully with the new seed words.\n\n" + + "You need to restart now the application.") + .closeButtonText("Restart") + .onClose(() -> BitsquareApp.restartDownHandler.run()).show(); + }), + throwable -> UserThread.execute(() -> { + log.error(throwable.getMessage()); + new Popup() + .headLine("Wrong password") + .warning("Please try entering your password again, carefully checking for typos or spelling errors.") + .show(); - new Popup() - .error("An error occurred when restoring the wallet with seed words.\n" + - "Error message: " + throwable.getMessage()) - .show(); - })); - }, "Restore wallet thread").start(); + new Popup() + .error("An error occurred when restoring the wallet with seed words.\n" + + "Error message: " + throwable.getMessage()) + .show(); + })); } } \ No newline at end of file diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 8df1e8d70f..b651d02fa0 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.name.Named; import io.bitsquare.app.ProgramArguments; @@ -35,8 +36,7 @@ import java.io.File; import java.math.BigInteger; import java.security.PublicKey; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.*; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -47,15 +47,20 @@ import static com.google.common.base.Preconditions.checkNotNull; public class P2PService { private static final Logger log = LoggerFactory.getLogger(P2PService.class); + private final SeedNodesRepository seedNodesRepository; + private final int port; + private final File torDir; + private final boolean useLocalhost; @Nullable private final EncryptionService encryptionService; - private final SetupListener setupListener; + private SetupListener setupListener; private KeyRing keyRing; + private final File storageDir; private final NetworkStatistics networkStatistics; - private final NetworkNode networkNode; - private final Routing routing; - private final ProtectedExpirableDataStorage dataStorage; + private NetworkNode networkNode; + private Routing routing; + private ProtectedExpirableDataStorage dataStorage; private final List decryptedMailListeners = new CopyOnWriteArrayList<>(); private final List decryptedMailboxListeners = new CopyOnWriteArrayList<>(); private final List p2pServiceListeners = new CopyOnWriteArrayList<>(); @@ -73,7 +78,7 @@ public class P2PService { private boolean allSeedNodesRequested; private Timer sendGetAllDataMessageTimer; private volatile boolean hiddenServiceReady; - + private final ExecutorService executorService; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -87,11 +92,27 @@ public class P2PService { @Nullable EncryptionService encryptionService, KeyRing keyRing, @Named("storage.dir") File storageDir) { + this.seedNodesRepository = seedNodesRepository; + this.port = port; + this.torDir = torDir; + this.useLocalhost = useLocalhost; this.encryptionService = encryptionService; this.keyRing = keyRing; + this.storageDir = storageDir; networkStatistics = new NetworkStatistics(); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("P2PService-%d") + .setDaemon(true) + .build(); + + executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + + init(); + } + + private void init() { // network layer if (useLocalhost) { networkNode = new LocalhostNetworkNode(port); @@ -578,7 +599,12 @@ public class P2PService { sendGetAllDataMessageTimer.schedule(new TimerTask() { @Override public void run() { - sendGetAllDataMessage(remainingSeedNodeAddresses); + try { + sendGetAllDataMessage(remainingSeedNodeAddresses); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } } }, new Random().nextInt(2000) + 1000); } else { diff --git a/network/src/main/java/io/bitsquare/p2p/Utils.java b/network/src/main/java/io/bitsquare/p2p/Utils.java index 7a8148dbf8..e8727ab50e 100644 --- a/network/src/main/java/io/bitsquare/p2p/Utils.java +++ b/network/src/main/java/io/bitsquare/p2p/Utils.java @@ -8,6 +8,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.net.ServerSocket; +import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -37,11 +38,13 @@ public class Utils { public static void shutDownExecutorService(ExecutorService executorService, long waitBeforeShutDown) { executorService.shutdown(); try { - executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS); + boolean done = executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS); + if (!done) log.trace("Not all tasks completed at shutdown."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - executorService.shutdownNow(); + final List rejected = executorService.shutdownNow(); + log.debug("Rejected tasks: {}", rejected.size()); } public static byte[] compress(Serializable input) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 3f7ee5e6df..6b19a409b1 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -1,10 +1,13 @@ package io.bitsquare.p2p.network; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.Uninterruptibles; import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Utils; import io.bitsquare.p2p.network.messages.CloseConnectionMessage; +import javafx.concurrent.Task; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,9 +19,7 @@ import java.net.SocketTimeoutException; import java.util.Date; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; public class Connection { private static final Logger log = LoggerFactory.getLogger(Connection.class); @@ -37,18 +38,18 @@ public class Connection { private final String uid; private final Map illegalRequests = new ConcurrentHashMap<>(); - private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final ExecutorService executorService; private ObjectOutputStream out; private ObjectInputStream in; + @Nullable + private Address peerAddress; + private boolean isAuthenticated; + private volatile boolean stopped; private volatile boolean shutDownInProgress; private volatile boolean inputHandlerStopped; - private volatile Date lastActivityDate; - @Nullable - private Address peerAddress; - private boolean isAuthenticated; //TODO got java.util.zip.DataFormatException: invalid distance too far back @@ -69,6 +70,17 @@ public class Connection { uid = UUID.randomUUID().toString(); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Connection-%d") + .setDaemon(true) + .build(); + + executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + + init(); + } + + private void init() { try { socket.setSoTimeout(SOCKET_TIMEOUT); // Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block @@ -203,12 +215,8 @@ public class Connection { if (sendCloseConnectionMessage) { sendMessage(new CloseConnectionMessage()); - try { - // give a bit of time for closing gracefully - Thread.sleep(100); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } + // give a bit of time for closing gracefully + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } try { @@ -296,61 +304,79 @@ public class Connection { private class InputHandler implements Runnable { @Override public void run() { - Thread.currentThread().setName("InputHandler-" + socket.getLocalPort()); - while (!inputHandlerStopped) { - try { - log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId()); - Object rawInputObject = in.readObject(); - log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString() - + " rawInputObject " + rawInputObject); + try { + Thread.currentThread().setName("InputHandler-" + socket.getLocalPort()); + while (!inputHandlerStopped) { + try { + log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId()); + Object rawInputObject = in.readObject(); + log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString() + + " rawInputObject " + rawInputObject); - int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; - if (size <= MAX_MSG_SIZE) { - Serializable serializable = null; - if (useCompression) { - if (rawInputObject instanceof byte[]) { - byte[] compressedObjectAsBytes = (byte[]) rawInputObject; - size = compressedObjectAsBytes.length; - //log.trace("Read object compressed data size: " + size); - serializable = Utils.decompress(compressedObjectAsBytes); - } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); - } - } else { - if (rawInputObject instanceof Serializable) { - serializable = (Serializable) rawInputObject; - } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); - } - } - //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); - - // compressed size might be bigger theoretically so we check again after decompression + int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; if (size <= MAX_MSG_SIZE) { - if (serializable instanceof Message) { - lastActivityDate = new Date(); - Message message = (Message) serializable; - if (message instanceof CloseConnectionMessage) { - inputHandlerStopped = true; - shutDown(false); + Serializable serializable = null; + if (useCompression) { + if (rawInputObject instanceof byte[]) { + byte[] compressedObjectAsBytes = (byte[]) rawInputObject; + size = compressedObjectAsBytes.length; + //log.trace("Read object compressed data size: " + size); + serializable = Utils.decompress(compressedObjectAsBytes); } else { - executorService.submit(() -> messageListener.onMessage(message, Connection.this)); + reportIllegalRequest(IllegalRequest.InvalidDataType); } } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); + if (rawInputObject instanceof Serializable) { + serializable = (Serializable) rawInputObject; + } else { + reportIllegalRequest(IllegalRequest.InvalidDataType); + } + } + //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); + + // compressed size might be bigger theoretically so we check again after decompression + if (size <= MAX_MSG_SIZE) { + if (serializable instanceof Message) { + lastActivityDate = new Date(); + Message message = (Message) serializable; + if (message instanceof CloseConnectionMessage) { + inputHandlerStopped = true; + shutDown(false); + } else { + Task task = new Task() { + @Override + protected Object call() throws Exception { + return null; + } + }; + executorService.submit(() -> { + try { + messageListener.onMessage(message, Connection.this); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + }); + } + } else { + reportIllegalRequest(IllegalRequest.InvalidDataType); + } + } else { + log.error("Received decompressed data exceeds max. msg size."); + reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } } else { - log.error("Received decompressed data exceeds max. msg size."); + log.error("Received compressed data exceeds max. msg size."); reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } - } else { - log.error("Received compressed data exceeds max. msg size."); - reportIllegalRequest(IllegalRequest.MaxSizeExceeded); + } catch (IOException | ClassNotFoundException e) { + inputHandlerStopped = true; + handleConnectionException(e); } - } catch (IOException | ClassNotFoundException e) { - inputHandlerStopped = true; - handleConnectionException(e); } + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index b2a22a1a7b..e45a89a36d 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -1,9 +1,6 @@ package io.bitsquare.p2p.network; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.*; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import io.bitsquare.p2p.Address; @@ -19,6 +16,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { @@ -92,8 +90,9 @@ public class LocalhostNetworkNode extends NetworkNode { private void createTorNode(final Consumer resultHandler) { Callable> task = () -> { long ts = System.currentTimeMillis(); - log.trace("[simulation] Create TorNode"); - if (simulateTorDelayTorNode > 0) Thread.sleep(simulateTorDelayTorNode); + if (simulateTorDelayTorNode > 0) + Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS); + log.info("\n\n############################################################\n" + "TorNode created [simulation]:" + "\nTook " + (System.currentTimeMillis() - ts) + " ms" @@ -115,8 +114,9 @@ public class LocalhostNetworkNode extends NetworkNode { private void createHiddenService(final Consumer resultHandler) { Callable task = () -> { long ts = System.currentTimeMillis(); - log.debug("[simulation] Create hidden service"); - if (simulateTorDelayHiddenService > 0) Thread.sleep(simulateTorDelayHiddenService); + if (simulateTorDelayHiddenService > 0) + Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS); + log.info("\n\n############################################################\n" + "Hidden service created [simulation]:" + "\nTook " + (System.currentTimeMillis() - ts) + " ms" diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 8c0e5b2bf8..97515fcc10 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -58,71 +58,77 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener final SettableFuture resultFuture = SettableFuture.create(); Callable task = () -> { - Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress); + try { + Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress); - Optional outboundConnectionOptional = getOutboundConnection(peerAddress); - Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; + Optional outboundConnectionOptional = getOutboundConnection(peerAddress); + Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; - if (connection != null && connection.isStopped()) { - log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); - outBoundConnections.remove(connection); - connection = null; - } - - if (connection == null) { - Optional inboundConnectionOptional = getInboundConnection(peerAddress); - if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); - if (connection != null) - log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); - } - - if (connection == null) { - try { - Socket socket = getSocket(peerAddress); // can take a while when using tor - connection = new Connection(socket, - (message1, connection1) -> NetworkNode.this.onMessage(message1, connection1), - new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - NetworkNode.this.onConnection(connection); - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress); - NetworkNode.this.onDisconnect(reason, connection); - } - - @Override - public void onError(Throwable throwable) { - NetworkNode.this.onError(throwable); - } - }); - if (!outBoundConnections.contains(connection)) - outBoundConnections.add(connection); - else - log.error("We have already that connection in our list. That must not happen. " - + outBoundConnections + " / connection=" + connection); - - log.info("\n\nNetworkNode created new outbound connection:" - + "\npeerAddress=" + peerAddress.port - + "\nconnection.uid=" + connection.getUid() - + "\nmessage=" + message - + "\n\n"); - } catch (Throwable t) { - resultFuture.setException(t); - return null; + if (connection != null && connection.isStopped()) { + log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); + outBoundConnections.remove(connection); + connection = null; } + + if (connection == null) { + Optional inboundConnectionOptional = getInboundConnection(peerAddress); + if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); + if (connection != null) + log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); + } + + if (connection == null) { + try { + Socket socket = getSocket(peerAddress); // can take a while when using tor + connection = new Connection(socket, + (message1, connection1) -> NetworkNode.this.onMessage(message1, connection1), + new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + NetworkNode.this.onConnection(connection); + } + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress); + NetworkNode.this.onDisconnect(reason, connection); + } + + @Override + public void onError(Throwable throwable) { + NetworkNode.this.onError(throwable); + } + }); + if (!outBoundConnections.contains(connection)) + outBoundConnections.add(connection); + else + log.error("We have already that connection in our list. That must not happen. " + + outBoundConnections + " / connection=" + connection); + + log.info("\n\nNetworkNode created new outbound connection:" + + "\npeerAddress=" + peerAddress.port + + "\nconnection.uid=" + connection.getUid() + + "\nmessage=" + message + + "\n\n"); + } catch (Throwable t) { + resultFuture.setException(t); + return null; + } + } + + connection.sendMessage(message); + + return connection; + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + throw t; } - - connection.sendMessage(message); - - return connection; }; ListenableFuture future = executorService.submit(task); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 947c3ca8fa..ed6e171cdb 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -1,6 +1,5 @@ package io.bitsquare.p2p.network; -import io.bitsquare.p2p.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,8 +9,6 @@ import java.net.Socket; import java.net.SocketException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; public class Server implements Runnable { private static final Logger log = LoggerFactory.getLogger(Server.class); @@ -19,7 +16,6 @@ public class Server implements Runnable { private final ServerSocket serverSocket; private final MessageListener messageListener; private final ConnectionListener connectionListener; - private final ExecutorService executorService = Executors.newCachedThreadPool(); private final List connections = new CopyOnWriteArrayList<>(); private volatile boolean stopped; @@ -32,25 +28,30 @@ public class Server implements Runnable { @Override public void run() { - Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); - while (!stopped) { - try { - log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); - final Socket socket = serverSocket.accept(); - log.info("Accepted new client on port " + socket.getLocalPort()); - Connection connection = new Connection(socket, messageListener, connectionListener); - log.info("\n\nServer created new inbound connection:" - + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() - + "\nsocket.getPort()=" + socket.getPort() - + "\nconnection.uid=" + connection.getUid() - + "\n\n"); + try { + Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); + while (!stopped) { + try { + log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); + final Socket socket = serverSocket.accept(); + log.info("Accepted new client on port " + socket.getLocalPort()); + Connection connection = new Connection(socket, messageListener, connectionListener); + log.info("\n\nServer created new inbound connection:" + + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() + + "\nsocket.getPort()=" + socket.getPort() + + "\nconnection.uid=" + connection.getUid() + + "\n\n"); - log.info("Server created new socket with port " + socket.getPort()); - connections.add(connection); - } catch (IOException e) { - if (!stopped) - e.printStackTrace(); + log.info("Server created new socket with port " + socket.getPort()); + connections.add(connection); + } catch (IOException e) { + if (!stopped) + e.printStackTrace(); + } } + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } } @@ -67,7 +68,6 @@ public class Server implements Runnable { } catch (IOException e) { e.printStackTrace(); } finally { - Utils.shutDownExecutorService(executorService); log.debug("Server shutdown complete"); } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index 4cf076aef3..307601cd13 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -21,6 +21,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -60,48 +61,62 @@ public class TorNetworkNode extends NetworkNode { this.torDir = torDir; + init(); + } + + private void init() { selfTestTimeoutTask = new TimerTask() { @Override public void run() { - log.error("A timeout occurred at self test"); - stopSelfTestTimer(); - selfTestFailed(); + try { + log.error("A timeout occurred at self test"); + stopSelfTestTimer(); + selfTestFailed(); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } } }; selfTestTask = new TimerTask() { @Override public void run() { - stopTimeoutTimer(); - if (selfTestRunning.get()) { - log.debug("running self test"); - selfTestTimeoutTimer = new Timer(); - selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT); - // might be interrupted by timeout task + try { + stopTimeoutTimer(); if (selfTestRunning.get()) { - nonce = random.nextLong(); - log.trace("send msg with nonce " + nonce); + log.debug("running self test"); + selfTestTimeoutTimer = new Timer(); + selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT); + // might be interrupted by timeout task + if (selfTestRunning.get()) { + nonce = random.nextLong(); + log.trace("send msg with nonce " + nonce); - try { - SettableFuture future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Sending self test message succeeded"); - } + try { + SettableFuture future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Sending self test message succeeded"); + } - @Override - public void onFailure(Throwable throwable) { - log.error("Error at sending self test message. Exception = " + throwable); - stopTimeoutTimer(); - throwable.printStackTrace(); - selfTestFailed(); - } - }); - } catch (Exception e) { - e.printStackTrace(); + @Override + public void onFailure(Throwable throwable) { + log.error("Error at sending self test message. Exception = " + throwable); + stopTimeoutTimer(); + throwable.printStackTrace(); + selfTestFailed(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } } } + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } } }; @@ -144,11 +159,7 @@ public class TorNetworkNode extends NetworkNode { TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; startServer(hiddenServiceDescriptor.getServerSocket()); - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); setupListeners.stream().forEach(e -> e.onHiddenServiceReady()); @@ -252,12 +263,7 @@ public class TorNetworkNode extends NetworkNode { restartCounter++; if (restartCounter <= MAX_RESTART_ATTEMPTS) { shutDown(() -> { - try { - Thread.sleep(WAIT_BEFORE_RESTART); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } + Uninterruptibles.sleepUninterruptibly(WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS); log.warn("We restart tor as too many self tests failed."); start(null); }); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java index 40fd0a198a..f756299026 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java @@ -1,8 +1,6 @@ package io.bitsquare.p2p.routing; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.*; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Utils; @@ -15,10 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.stream.Collectors; public class Routing { @@ -47,7 +42,7 @@ public class Routing { private final List
reportedNeighborAddresses = new CopyOnWriteArrayList<>(); private final Map authenticationCompleteHandlers = new ConcurrentHashMap<>(); private final Timer maintenanceTimer = new Timer(); - private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final ExecutorService executorService; private volatile boolean shutDownInProgress; @@ -61,6 +56,17 @@ public class Routing { // We copy it as we remove ourselves later from the list if we are a seed node this.seedNodes = new CopyOnWriteArrayList<>(seeds); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Routing-%d") + .setDaemon(true) + .build(); + + executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + + init(networkNode); + } + + private void init(NetworkNode networkNode) { networkNode.addMessageListener((message, connection) -> { if (message instanceof AuthenticationMessage) processAuthenticationMessage((AuthenticationMessage) message, connection); @@ -110,8 +116,13 @@ public class Routing { maintenanceTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - disconnectOldConnections(); - pingNeighbors(); + try { + disconnectOldConnections(); + pingNeighbors(); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } } }, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL); } @@ -126,11 +137,7 @@ public class Routing { Connection connection = authenticatedConnections.remove(0); log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); connection.shutDown(() -> disconnectOldConnections()); - try { - Thread.sleep(200); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } } @@ -153,11 +160,7 @@ public class Routing { removeNeighbor(e.address); } }); - try { - Thread.sleep(new Random().nextInt(5000) + 5000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } + Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS); }); } @@ -247,16 +250,16 @@ public class Routing { public void startAuthentication(List
connectedSeedNodes) { connectedSeedNodes.forEach(connectedSeedNode -> { executorService.submit(() -> { - sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); try { + sendRequestAuthenticationMessage(seedNodes, connectedSeedNode); // give a random pause of 3-5 sec. before using the next - Thread.sleep(new Random().nextInt(2000) + 3000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); + Uninterruptibles.sleepUninterruptibly(new Random().nextInt(2000) + 3000, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } }); }); - } private void sendRequestAuthenticationMessage(final List
remainingSeedNodes, final Address address) { @@ -315,17 +318,11 @@ public class Routing { connection.shutDown(() -> { // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + if (simulateAuthTorNode > 0) + Uninterruptibles.sleepUninterruptibly(simulateAuthTorNode, TimeUnit.MILLISECONDS); - try { - if (simulateAuthTorNode > 0) Thread.sleep(simulateAuthTorNode); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); long nonce = addToMapAndGetNonce(peerAddress); SettableFuture future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce)); @@ -413,25 +410,25 @@ public class Routing { ArrayList
neighborAddresses = ((GetNeighborsMessage) message).neighborAddresses; log.trace("Received neighbors: " + neighborAddresses); // remove ourselves - neighborAddresses.remove(getAddress()); addToReportedNeighbors(neighborAddresses, connection); } } else if (message instanceof NeighborsMessage) { - log.trace("NeighborsMessage from " + connection.getPeerAddress() + " at " + getAddress()); NeighborsMessage neighborsMessage = (NeighborsMessage) message; + Address peerAddress = neighborsMessage.address; + log.trace("NeighborsMessage from " + peerAddress + " at " + getAddress()); ArrayList
neighborAddresses = neighborsMessage.neighborAddresses; log.trace("Received neighbors: " + neighborAddresses); // remove ourselves - neighborAddresses.remove(getAddress()); addToReportedNeighbors(neighborAddresses, connection); - log.info("\n\nAuthenticationComplete\nPeer with address " + connection.getPeerAddress().toString() - + " authenticated (" + connection.getObjectId() + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); - // we wait until the handshake is completed before setting the authenticate flag // authentication at both sides of the connection - setAuthenticated(connection, neighborsMessage.address); + + log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + + " authenticated (" + connection.getObjectId() + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + setAuthenticated(connection, peerAddress); Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress()); if (authenticationCompleteHandler != null) @@ -442,18 +439,21 @@ public class Routing { } private void addToReportedNeighbors(ArrayList
neighborAddresses, Connection connection) { + log.trace("addToReportedNeighbors"); // we disconnect misbehaving nodes trying to send too many neighbors // reported neighbors include the peers connected neighbors which is normally max. 8 but we give some headroom // for safety if (neighborAddresses.size() > 1100) { connection.shutDown(); } else { + neighborAddresses.remove(getAddress()); reportedNeighborAddresses.addAll(neighborAddresses); purgeReportedNeighbors(); } } private void purgeReportedNeighbors() { + log.trace("purgeReportedNeighbors"); int all = getAllNeighborAddresses().size(); if (all > 1000) { int diff = all - 100; @@ -478,20 +478,21 @@ public class Routing { private void authenticateToNextRandomNeighbor() { executorService.submit(() -> { try { - Thread.sleep(new Random().nextInt(200) + 200); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) { - Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress(); - if (randomNotConnectedNeighborAddress != null) { - log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress); - authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor()); + Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS); + if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) { + Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress(); + if (randomNotConnectedNeighborAddress != null) { + log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress); + authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor()); + } else { + log.info("No more neighbors available for connecting."); + } } else { - log.info("No more neighbors available for connecting."); + log.info("We have already enough connections."); } - } else { - log.info("We have already enough connections."); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); } }); } @@ -531,9 +532,8 @@ public class Routing { private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) { log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); - long nonce = nonceMap.remove(peerAddress); - boolean result = nonce == peersNonce; - return result; + Long nonce = nonceMap.remove(peerAddress); + return nonce != null && nonce == peersNonce; } private void setAuthenticated(Connection connection, Address peerAddress) { diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index 0d386ae84d..4eefb95df1 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -52,6 +52,10 @@ public class ProtectedExpirableDataStorage { storage = new Storage<>(storageDir); + init(); + } + + private void init() { ConcurrentHashMap persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap"); if (persisted != null) { sequenceNumberMap = persisted; @@ -79,9 +83,14 @@ public class ProtectedExpirableDataStorage { timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - log.info("removeExpiredEntries called "); - map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) - .forEach(entry -> map.remove(entry.getKey())); + try { + log.info("removeExpiredEntries called "); + map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) + .forEach(entry -> map.remove(entry.getKey())); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } } }, CHECK_TTL_INTERVAL,