diff --git a/common/src/main/java/io/bitsquare/app/Logging.java b/common/src/main/java/io/bitsquare/app/Log.java similarity index 69% rename from common/src/main/java/io/bitsquare/app/Logging.java rename to common/src/main/java/io/bitsquare/app/Log.java index 39bb17a4a2..a95ee9c77a 100644 --- a/common/src/main/java/io/bitsquare/app/Logging.java +++ b/common/src/main/java/io/bitsquare/app/Log.java @@ -24,7 +24,9 @@ import ch.qos.logback.core.rolling.RollingFileAppender; import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy; import org.slf4j.LoggerFactory; -public class Logging { +public class Log { + public static boolean PRINT_TRACE_METHOD = true; + public static void setup(String fileName) { LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); @@ -46,7 +48,7 @@ public class Logging { PatternLayoutEncoder encoder = new PatternLayoutEncoder(); encoder.setContext(loggerContext); - encoder.setPattern("%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n"); + encoder.setPattern("%highlight(%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n)"); encoder.start(); appender.setEncoder(encoder); @@ -57,4 +59,22 @@ public class Logging { ch.qos.logback.classic.Logger logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); logbackLogger.addAppender(appender); } + + public static void traceCall() { + StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1]; + String methodName = stackTraceElement.getMethodName(); + if (methodName.equals("")) + methodName = "Constructor "; + String className = stackTraceElement.getClassName(); + LoggerFactory.getLogger(className).trace("Called: {}", methodName); + } + + public static void traceCall(String message) { + StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1]; + String methodName = stackTraceElement.getMethodName(); + if (methodName.equals("")) + methodName = "Constructor "; + String className = stackTraceElement.getClassName(); + LoggerFactory.getLogger(className).trace("Called: {} [{}]", methodName, message); + } } diff --git a/common/src/main/java/io/bitsquare/common/util/Utilities.java b/common/src/main/java/io/bitsquare/common/util/Utilities.java index ce1ea7fa88..e0b37144c5 100644 --- a/common/src/main/java/io/bitsquare/common/util/Utilities.java +++ b/common/src/main/java/io/bitsquare/common/util/Utilities.java @@ -35,6 +35,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLConnection; import java.net.URLEncoder; +import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -372,6 +373,10 @@ public class Utilities { } } + public static void setThreadName(String name) { + Thread.currentThread().setName(name + "-" + new Random().nextInt(10000)); + } + private static class AnnotationExclusionStrategy implements ExclusionStrategy { @Override public boolean shouldSkipField(FieldAttributes f) { diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java index 9f8ee5641b..a31e72ce19 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorManager.java @@ -127,7 +127,7 @@ public class ArbitratorManager { } @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java index 2cb85d6ea6..7c780adb32 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java @@ -81,6 +81,7 @@ public class ArbitratorService { } public Map getArbitrators() { + // TODO java.lang.IllegalStateException: Duplicate key final Map arbitratorsMap = p2PService.getDataMap().values().stream() .filter(e -> e.expirablePayload instanceof Arbitrator) .map(e -> (Arbitrator) e.expirablePayload) diff --git a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java index 7e508a30fa..d7b8f867b5 100644 --- a/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java +++ b/core/src/main/java/io/bitsquare/arbitration/DisputeManager.java @@ -129,7 +129,7 @@ public class DisputeManager { } @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override diff --git a/core/src/main/java/io/bitsquare/btc/WalletService.java b/core/src/main/java/io/bitsquare/btc/WalletService.java index fb6e4971a8..f79b4e738c 100644 --- a/core/src/main/java/io/bitsquare/btc/WalletService.java +++ b/core/src/main/java/io/bitsquare/btc/WalletService.java @@ -28,6 +28,7 @@ import io.bitsquare.common.UserThread; import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ExceptionHandler; import io.bitsquare.common.handlers.ResultHandler; +import io.bitsquare.common.util.Utilities; import io.bitsquare.user.Preferences; import javafx.beans.property.*; import org.bitcoinj.core.*; @@ -122,7 +123,7 @@ public class WalletService { Timer timeoutTimer = FxTimer.runLater( Duration.ofMillis(STARTUP_TIMEOUT), () -> { - Thread.currentThread().setName("WalletService:StartupTimeout-" + new Random().nextInt(1000)); + Utilities.setThreadName("WalletService:StartupTimeout"); exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds.")); } ); diff --git a/core/src/main/java/io/bitsquare/trade/TradeManager.java b/core/src/main/java/io/bitsquare/trade/TradeManager.java index 6716606f24..15ab68f039 100644 --- a/core/src/main/java/io/bitsquare/trade/TradeManager.java +++ b/core/src/main/java/io/bitsquare/trade/TradeManager.java @@ -162,7 +162,7 @@ public class TradeManager { } @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java index 79b19d581c..8cca20868f 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOffer.java @@ -18,6 +18,7 @@ package io.bitsquare.trade.offer; import io.bitsquare.app.Version; +import io.bitsquare.common.util.Utilities; import io.bitsquare.storage.Storage; import io.bitsquare.trade.Tradable; import io.bitsquare.trade.TradableList; @@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.time.Duration; import java.util.Date; -import java.util.Random; public class OpenOffer implements Tradable, Serializable { // That object is saved to disc. We need to take care of changes to not break deserialization. @@ -103,8 +103,8 @@ public class OpenOffer implements Tradable, Serializable { timeoutTimer = FxTimer.runLater( Duration.ofMillis(TIMEOUT), () -> { - Thread.currentThread().setName("OpenOffer:Timeout-" + new Random().nextInt(1000)); - log.debug("Timeout reached"); + Utilities.setThreadName("OpenOffer:Timeout"); + log.info("Timeout reached"); if (state == State.RESERVED) setState(State.AVAILABLE); }); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 1baa45d04c..1546d74026 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -24,6 +24,7 @@ import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ResultHandler; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.P2PService; @@ -47,7 +48,6 @@ import javax.inject.Named; import java.io.File; import java.time.Duration; import java.util.Optional; -import java.util.Random; import java.util.Timer; import java.util.TimerTask; @@ -142,7 +142,7 @@ public class OpenOfferManager { } @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override @@ -165,7 +165,7 @@ public class OpenOfferManager { TimerTask timerTask = new TimerTask() { @Override public void run() { - Thread.currentThread().setName("RepublishOffers-" + new Random().nextInt(1000)); + Utilities.setThreadName("RepublishOffers"); UserThread.execute(() -> rePublishOffers()); try { } catch (Throwable t) { diff --git a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java index b6eada87dd..18018c2fc2 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/availability/OfferAvailabilityProtocol.java @@ -20,6 +20,7 @@ package io.bitsquare.trade.protocol.availability; import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ResultHandler; import io.bitsquare.common.taskrunner.TaskRunner; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.messaging.DecryptedMailListener; import io.bitsquare.trade.offer.Offer; @@ -34,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Random; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -145,7 +145,7 @@ public class OfferAvailabilityProtocol { stopTimeout(); timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> { - Thread.currentThread().setName("OfferAvailabilityProtocol:Timeout-" + new Random().nextInt(1000)); + Utilities.setThreadName("OfferAvailabilityProtocol:Timeout"); log.warn("Timeout reached"); errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded."); }); diff --git a/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java b/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java index 0ffbb0a186..30de7e9ba0 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/trade/TradeProtocol.java @@ -19,6 +19,7 @@ package io.bitsquare.trade.protocol.trade; import io.bitsquare.arbitration.Arbitrator; import io.bitsquare.common.crypto.PubKeyRing; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.messaging.DecryptedMailListener; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.security.PublicKey; import java.time.Duration; import java.util.Optional; -import java.util.Random; import static io.bitsquare.util.Validator.nonEmptyStringOf; @@ -127,7 +127,7 @@ public abstract class TradeProtocol { stopTimeout(); timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> { - Thread.currentThread().setName("TradeProtocol:Timeout-" + new Random().nextInt(1000)); + Utilities.setThreadName("TradeProtocol:Timeout"); log.error("Timeout reached"); trade.setErrorMessage("A timeout occurred."); cleanupTradable(); diff --git a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java index 6f4e4125fe..921960ddd8 100644 --- a/gui/src/main/java/io/bitsquare/app/BitsquareApp.java +++ b/gui/src/main/java/io/bitsquare/app/BitsquareApp.java @@ -100,8 +100,9 @@ public class BitsquareApp extends Application { public void start(Stage primaryStage) throws IOException { BitsquareApp.primaryStage = primaryStage; - Logging.setup(Paths.get(env.getProperty(BitsquareEnvironment.APP_DATA_DIR_KEY), "bitsquare").toString()); - + Log.setup(Paths.get(env.getProperty(BitsquareEnvironment.APP_DATA_DIR_KEY), "bitsquare").toString()); + Log.PRINT_TRACE_METHOD = DEV_MODE; + UserThread.setExecutor(Platform::runLater); shutDownHandler = this::stop; diff --git a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java index 8064b97be7..56f7ad2002 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java +++ b/gui/src/main/java/io/bitsquare/gui/main/MainViewModel.java @@ -214,7 +214,7 @@ class MainViewModel implements ViewModel { } @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { p2pNetworkInfoFooter.set("Data received from peer."); p2pNetworkReady.set(true); } diff --git a/gui/src/main/resources/logback.xml b/gui/src/main/resources/logback.xml index 22f6f08efe..c75af44d87 100644 --- a/gui/src/main/resources/logback.xml +++ b/gui/src/main/resources/logback.xml @@ -2,11 +2,11 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n + %highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{15} - %msg %xEx%n) - + @@ -20,7 +20,8 @@ - + + diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 405adef8ec..77218f8ff8 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -5,8 +5,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.google.inject.name.Named; +import io.bitsquare.app.Log; import io.bitsquare.app.ProgramArguments; -import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.PubKeyRing; @@ -75,7 +75,7 @@ public class P2PService implements SetupListener { private boolean shutDownComplete; private final CopyOnWriteArraySet shutDownResultHandlers = new CopyOnWriteArraySet<>(); private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); - private final BooleanProperty allDataLoaded = new SimpleBooleanProperty(); + private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty(); private final BooleanProperty authenticated = new SimpleBooleanProperty(); private MonadicBinding readyForAuthentication; @@ -92,6 +92,7 @@ public class P2PService implements SetupListener { @Nullable EncryptionService encryptionService, KeyRing keyRing, @Named("storage.dir") File storageDir) { + Log.traceCall(); this.seedNodesRepository = seedNodesRepository; this.port = port; this.torDir = torDir; @@ -106,6 +107,7 @@ public class P2PService implements SetupListener { } private void init() { + Log.traceCall(); // network Set
seedNodeAddresses; if (useLocalhost) { @@ -127,32 +129,37 @@ public class P2PService implements SetupListener { networkNode.addConnectionListener(new ConnectionListener() { @Override public void onConnection(Connection connection) { + Log.traceCall(); } @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress must match connection.getPeerAddress()"); authenticatedPeerAddresses.add(peerAddress); authenticated.set(true); dataStorage.setAuthenticated(); - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated())); + p2pServiceListeners.stream().forEach(e -> e.onAuthenticated()); } @Override public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); if (connection.isAuthenticated()) authenticatedPeerAddresses.remove(connection.getPeerAddress()); } @Override public void onError(Throwable throwable) { + Log.traceCall(); log.error("onError self/ConnectionException " + networkNode.getAddress() + "/" + throwable); } }); networkNode.addMessageListener((message, connection) -> { + Log.traceCall(); if (message instanceof GetDataRequest) { log.trace("Received GetDataSetMessage: " + message); networkNode.sendMessage(connection, new GetDataResponse(getDataSet())); @@ -166,15 +173,15 @@ public class P2PService implements SetupListener { } else { log.trace("Received DataSetMessage: Empty data set"); } - allDataLoaded(); + setRequestingDataCompleted(); } else if (message instanceof SealedAndSignedMessage) { if (encryptionService != null) { try { SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify( sealedAndSignedMessage.sealedAndSigned); - UserThread.execute(() -> decryptedMailListeners.stream().forEach( - e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()))); + decryptedMailListeners.stream().forEach( + e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())); } catch (CryptoException e) { log.info("Decryption of SealedAndSignedMessage failed. " + "That is expected if the message is not intended for us."); @@ -186,6 +193,7 @@ public class P2PService implements SetupListener { peerGroup.addPeerListener(new PeerListener() { @Override public void onFirstAuthenticatePeer(Peer peer) { + Log.traceCall(); log.trace("onFirstAuthenticatePeer " + peer); sendGetAllDataMessageAfterAuthentication(peer); @@ -193,41 +201,46 @@ public class P2PService implements SetupListener { @Override public void onPeerAdded(Peer peer) { + Log.traceCall(); } @Override public void onPeerRemoved(Address address) { + Log.traceCall(); } @Override public void onConnectionAuthenticated(Connection connection) { + Log.traceCall(); } }); dataStorage.addHashMapChangedListener(new HashMapChangedListener() { @Override public void onAdded(ProtectedData entry) { + Log.traceCall(); if (entry instanceof ProtectedMailboxData) tryDecryptMailboxData((ProtectedMailboxData) entry); } @Override public void onRemoved(ProtectedData entry) { + Log.traceCall(); } }); - readyForAuthentication = EasyBind.combine(hiddenServicePublished, allDataLoaded, authenticated, + readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, authenticated, (a, b, c) -> a && b && !c); readyForAuthentication.subscribe((observable, oldValue, newValue) -> { // we need to have both the initial data delivered and the hidden service published before we // bootstrap and authenticate to other nodes. if (newValue) - authenticateSeedNode(); + tryAuthenticateSeedNode(); }); - allDataLoaded.addListener((observable, oldValue, newValue) -> { + requestingDataCompleted.addListener((observable, oldValue, newValue) -> { if (newValue) - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived())); + p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted()); }); } @@ -238,18 +251,20 @@ public class P2PService implements SetupListener { @Override public void onTorNodeReady() { - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady())); + Log.traceCall(); + p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); // 1. Step: As soon we have the tor node ready (hidden service still not available) we request the // data set from a random seed node. - sendGetAllDataMessage(peerGroup.getSeedNodeAddresses()); + sendGetDataRequest(peerGroup.getSeedNodeAddresses()); } @Override public void onHiddenServicePublished() { + Log.traceCall(); checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready"); - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished())); + p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished()); // 3. (or 2.). Step: Hidden service is published hiddenServicePublished.set(true); @@ -257,12 +272,13 @@ public class P2PService implements SetupListener { @Override public void onSetupFailed(Throwable throwable) { - UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable))); + Log.traceCall(); + p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)); } - private void sendGetAllDataMessage(Collection
seedNodeAddresses) { + private void sendGetDataRequest(Collection
seedNodeAddresses) { + Log.traceCall(); if (!seedNodeAddresses.isEmpty()) { - log.trace("sendGetAllDataMessage"); List
remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); Collections.shuffle(remainingSeedNodeAddresses); Address candidate = remainingSeedNodeAddresses.remove(0); @@ -281,34 +297,39 @@ public class P2PService implements SetupListener { log.info("Send GetAllDataMessage to " + candidate + " failed. " + "That is expected if other seed nodes are offline." + "\nException:" + throwable.getMessage()); - log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses); - sendGetAllDataMessage(remainingSeedNodeAddresses); + if (!remainingSeedNodeAddresses.isEmpty()) + log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses); + + sendGetDataRequest(remainingSeedNodeAddresses); } }); } else { log.info("There is no seed node available for requesting data. That is expected for the first seed node."); - allDataLoaded(); + setRequestingDataCompleted(); } } - private void allDataLoaded() { + private void setRequestingDataCompleted() { + Log.traceCall(); // 2. (or 3.) Step: We got all data loaded - if (!allDataLoaded.get()) { - log.trace("allDataLoaded"); - allDataLoaded.set(true); - } + if (!requestingDataCompleted.get()) + requestingDataCompleted.set(true); } // 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node. - private void authenticateSeedNode() { + private void tryAuthenticateSeedNode() { + Log.traceCall(); if (connectedSeedNode != null) { log.trace("authenticateSeedNode"); peerGroup.authenticateSeedNode(connectedSeedNode); + } else { + log.debug("No connected seedNode available."); } } // 5. Step: private void sendGetAllDataMessageAfterAuthentication(final Peer peer) { + Log.traceCall(); log.trace("sendGetDataSetMessageAfterAuthentication"); // After authentication we request again data as we might have missed pushed data in the meantime SettableFuture future = networkNode.sendMessage(peer.connection, new GetDataRequest()); @@ -333,14 +354,17 @@ public class P2PService implements SetupListener { // used by seed nodes to exclude themselves form list public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) { + Log.traceCall(); peerGroup.removeMySeedNodeAddressFromList(mySeedNodeAddress); } public void start() { + Log.traceCall(); start(null); } public void start(@Nullable P2PServiceListener listener) { + Log.traceCall(); if (listener != null) addP2PServiceListener(listener); @@ -348,6 +372,7 @@ public class P2PService implements SetupListener { } public void shutDown(Runnable shutDownCompleteHandler) { + Log.traceCall(); if (!shutDownInProgress) { shutDownInProgress = true; @@ -361,12 +386,12 @@ public class P2PService implements SetupListener { if (networkNode != null) networkNode.shutDown(() -> { - UserThread.execute(() -> shutDownResultHandlers.stream().forEach(e -> new Thread(e).start())); + shutDownResultHandlers.stream().forEach(e -> e.run()); shutDownComplete = true; }); } else { if (shutDownComplete) - new Thread(shutDownCompleteHandler).start(); + shutDownCompleteHandler.run(); else shutDownResultHandlers.add(shutDownCompleteHandler); log.warn("shutDown already in progress"); @@ -380,19 +405,21 @@ public class P2PService implements SetupListener { public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) { + Log.traceCall(); checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)"); checkAuthentication(); if (!authenticatedPeerAddresses.contains(peerAddress)) peerGroup.authenticateToPeer(peerAddress, () -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener), - () -> UserThread.execute(() -> sendMailMessageListener.onFault())); + () -> sendMailMessageListener.onFault()); else doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener); } private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) { + Log.traceCall(); if (encryptionService != null) { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( @@ -401,24 +428,25 @@ public class P2PService implements SetupListener { Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { - UserThread.execute(() -> sendMailMessageListener.onArrived()); + sendMailMessageListener.onArrived(); } @Override public void onFailure(@NotNull Throwable throwable) { throwable.printStackTrace(); - UserThread.execute(() -> sendMailMessageListener.onFault()); + sendMailMessageListener.onFault(); } }); } catch (CryptoException e) { e.printStackTrace(); - UserThread.execute(() -> sendMailMessageListener.onFault()); + sendMailMessageListener.onFault(); } } } public void sendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing, MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { + Log.traceCall(); checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)"); checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer"); checkAuthentication(); @@ -437,6 +465,7 @@ public class P2PService implements SetupListener { private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing, MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) { + Log.traceCall(); if (encryptionService != null) { try { SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage( @@ -446,7 +475,7 @@ public class P2PService implements SetupListener { @Override public void onSuccess(@Nullable Connection connection) { log.trace("SendEncryptedMailboxMessage onSuccess"); - UserThread.execute(() -> sendMailboxMessageListener.onArrived()); + sendMailboxMessageListener.onArrived(); } @Override @@ -460,13 +489,13 @@ public class P2PService implements SetupListener { keyRing.getSignatureKeyPair().getPublic(), receiverStoragePublicKey), receiverStoragePublicKey); - UserThread.execute(() -> sendMailboxMessageListener.onStoredInMailbox()); + sendMailboxMessageListener.onStoredInMailbox(); } }); } catch (CryptoException e) { e.printStackTrace(); log.error("sendEncryptedMessage failed"); - UserThread.execute(() -> sendMailboxMessageListener.onFault()); + sendMailboxMessageListener.onFault(); } } } @@ -477,6 +506,7 @@ public class P2PService implements SetupListener { /////////////////////////////////////////////////////////////////////////////////////////// public boolean addData(ExpirablePayload expirablePayload) { + Log.traceCall(); checkAuthentication(); try { @@ -489,6 +519,7 @@ public class P2PService implements SetupListener { } private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { + Log.traceCall(); checkAuthentication(); try { @@ -500,6 +531,7 @@ public class P2PService implements SetupListener { } public boolean removeData(ExpirablePayload expirablePayload) { + Log.traceCall(); checkAuthentication(); try { @@ -512,6 +544,7 @@ public class P2PService implements SetupListener { } public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) { + Log.traceCall(); checkAuthentication(); ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey); @@ -525,6 +558,7 @@ public class P2PService implements SetupListener { } private void removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) { + Log.traceCall(); checkAuthentication(); try { @@ -536,6 +570,7 @@ public class P2PService implements SetupListener { } public Map getDataMap() { + Log.traceCall(); return dataStorage.getMap(); } @@ -545,38 +580,47 @@ public class P2PService implements SetupListener { /////////////////////////////////////////////////////////////////////////////////////////// public void addMessageListener(MessageListener messageListener) { + Log.traceCall(); networkNode.addMessageListener(messageListener); } public void removeMessageListener(MessageListener messageListener) { + Log.traceCall(); networkNode.removeMessageListener(messageListener); } public void addDecryptedMailListener(DecryptedMailListener listener) { + Log.traceCall(); decryptedMailListeners.add(listener); } public void removeDecryptedMailListener(DecryptedMailListener listener) { + Log.traceCall(); decryptedMailListeners.remove(listener); } public void addDecryptedMailboxListener(DecryptedMailboxListener listener) { + Log.traceCall(); decryptedMailboxListeners.add(listener); } public void removeDecryptedMailboxListener(DecryptedMailboxListener listener) { + Log.traceCall(); decryptedMailboxListeners.remove(listener); } public void addP2PServiceListener(P2PServiceListener listener) { + Log.traceCall(); p2pServiceListeners.add(listener); } public void removeP2PServiceListener(P2PServiceListener listener) { + Log.traceCall(); p2pServiceListeners.remove(listener); } public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) { + Log.traceCall(); dataStorage.addHashMapChangedListener(hashMapChangedListener); } @@ -586,22 +630,27 @@ public class P2PService implements SetupListener { /////////////////////////////////////////////////////////////////////////////////////////// public boolean isAuthenticated() { + Log.traceCall(); return authenticated.get(); } public NetworkNode getNetworkNode() { + Log.traceCall(); return networkNode; } public PeerGroup getPeerGroup() { + Log.traceCall(); return peerGroup; } public Address getAddress() { + Log.traceCall(); return networkNode.getAddress(); } public NetworkStatistics getNetworkStatistics() { + Log.traceCall(); return networkStatistics; } @@ -611,10 +660,12 @@ public class P2PService implements SetupListener { /////////////////////////////////////////////////////////////////////////////////////////// private HashSet getDataSet() { + Log.traceCall(); return new HashSet<>(dataStorage.getMap().values()); } private void tryDecryptMailboxData(ProtectedMailboxData mailboxData) { + Log.traceCall(); if (encryptionService != null) { ExpirablePayload data = mailboxData.expirablePayload; if (data instanceof ExpirableMailboxPayload) { @@ -627,12 +678,11 @@ public class P2PService implements SetupListener { Address senderAddress = mailboxMessage.getSenderAddress(); checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages"); - mailboxMap.put(decryptedMsgWithPubKey, mailboxData); log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress=" + senderAddress + " / my address=" + getAddress()); - UserThread.execute(() -> decryptedMailboxListeners.stream().forEach( - e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress))); + decryptedMailboxListeners.stream().forEach( + e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress)); } } catch (CryptoException e) { log.trace("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us. " + e.getMessage()); @@ -642,6 +692,7 @@ public class P2PService implements SetupListener { } private void checkAuthentication() { + Log.traceCall(); if (authenticatedPeerAddresses.isEmpty()) throw new AuthenticationException("You must be authenticated before adding data to the P2P network."); } diff --git a/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java b/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java index 93e42c89bc..240ed71116 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PServiceListener.java @@ -5,7 +5,7 @@ import io.bitsquare.p2p.network.SetupListener; public interface P2PServiceListener extends SetupListener { - void onAllDataReceived(); + void onRequestingDataCompleted(); void onAuthenticated(); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 0e053ad4af..45b64002fc 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -2,6 +2,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; +import io.bitsquare.app.Log; import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; @@ -24,7 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** - * Connection is created by the server thread or by send message from NetworkNode. + * Connection is created by the server thread or by sendMessage from NetworkNode. * All handlers are called on User thread. * Shared data between InputHandler thread and that */ @@ -34,7 +35,7 @@ public class Connection { private static final int MAX_ILLEGAL_REQUESTS = 5; private static final int SOCKET_TIMEOUT = 10 * 60 * 1000; // 10 min. //TODO set shorter private InputHandler inputHandler; - private boolean isAuthenticated; + private volatile boolean isAuthenticated; public static int getMaxMsgSize() { return MAX_MSG_SIZE; @@ -43,6 +44,7 @@ public class Connection { private final String portInfo; private final String uid; private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); + public final String objectId = super.toString().split("@")[1]; // set in init private ObjectOutputStream objectOutputStream; @@ -66,6 +68,7 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + Log.traceCall(); portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort(); uid = UUID.randomUUID().toString(); @@ -73,6 +76,7 @@ public class Connection { } private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + Log.traceCall(); sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression); try { socket.setSoTimeout(SOCKET_TIMEOUT); @@ -102,31 +106,38 @@ public class Connection { // API /////////////////////////////////////////////////////////////////////////////////////////// + // Called form UserThread public void setAuthenticated(Address peerAddress, Connection connection) { - this.peerAddress = peerAddress; + Log.traceCall(); + synchronized (peerAddress) { + this.peerAddress = peerAddress; + } isAuthenticated = true; - UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection)); + sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection); } + // Called form various threads public void sendMessage(Message message) { + Log.traceCall(); if (!stopped) { try { - log.trace("writeObject " + message + " on connection with port " + portInfo); + log.info("writeObject " + message + " on connection with port " + portInfo); + Object objectToWrite; + if (useCompression) { + byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message); + // log.trace("Write object uncompressed data size: " + messageAsBytes.length); + byte[] compressed = Utils.compress(message); + //log.trace("Write object compressed data size: " + compressed.length); + objectToWrite = compressed; + } else { + // log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length); + objectToWrite = message; + } if (!stopped) { - Object objectToWrite; - if (useCompression) { - byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message); - // log.trace("Write object uncompressed data size: " + messageAsBytes.length); - byte[] compressed = Utils.compress(message); - //log.trace("Write object compressed data size: " + compressed.length); - objectToWrite = compressed; - } else { - // log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length); - objectToWrite = message; + synchronized (objectOutputStream) { + objectOutputStream.writeObject(objectToWrite); + objectOutputStream.flush(); } - objectOutputStream.writeObject(objectToWrite); - objectOutputStream.flush(); - sharedSpace.updateLastActivityDate(); } } catch (IOException e) { @@ -134,37 +145,47 @@ public class Connection { sharedSpace.handleConnectionException(e); } } else { - log.debug("sendMessage after stopped"); + log.debug("called sendMessage but was already stopped"); } } public void reportIllegalRequest(IllegalRequest illegalRequest) { + Log.traceCall(); sharedSpace.reportIllegalRequest(illegalRequest); } + public synchronized void setPeerAddress(@Nullable Address peerAddress) { + Log.traceCall(); + this.peerAddress = peerAddress; + } /////////////////////////////////////////////////////////////////////////////////////////// // Getters /////////////////////////////////////////////////////////////////////////////////////////// @Nullable - public Address getPeerAddress() { + public synchronized Address getPeerAddress() { + //Log.traceCall(); return peerAddress; } public Date getLastActivityDate() { + //Log.traceCall(); return sharedSpace.getLastActivityDate(); } public boolean isAuthenticated() { + //Log.traceCall(); return isAuthenticated; } public String getUid() { + Log.traceCall(); return uid; } public boolean isStopped() { + Log.traceCall(); return stopped; } @@ -174,26 +195,30 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// public void shutDown(Runnable completeHandler) { + // Log.traceCall(); shutDown(true, completeHandler); } public void shutDown() { + //Log.traceCall(); shutDown(true, null); } private void shutDown(boolean sendCloseConnectionMessage) { + //Log.traceCall(); shutDown(sendCloseConnectionMessage, null); } private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { + Log.traceCall(this.toString()); if (!stopped) { log.info("\n\n############################################################\n" + "ShutDown connection:" + "\npeerAddress=" + peerAddress + "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort() + "/" + sharedSpace.getSocket().getPort() - + "\nobjectId=" + getObjectId() + " / uid=" + getUid() - + "\nisAuthenticated=" + isAuthenticated() + + "\nobjectId=" + objectId + " / uid=" + uid + + "\nisAuthenticated=" + isAuthenticated + "\n############################################################\n"); log.trace("ShutDown connection requested. Connection=" + this.toString()); @@ -205,10 +230,9 @@ public class Connection { if (sendCloseConnectionMessage) { new Thread(() -> { - Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.getObjectId()); + Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.objectId); try { sendMessage(new CloseConnectionMessage()); - // give a bit of time for closing gracefully Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } catch (Throwable t) { t.printStackTrace(); @@ -224,10 +248,12 @@ public class Connection { } private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) { + Log.traceCall(); ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason(); if (shutDownReason == null) shutDownReason = ConnectionListener.Reason.SHUT_DOWN; final ConnectionListener.Reason finalShutDownReason = shutDownReason; + // keep UserThread.execute as its not clear if that is called from a non-UserThread UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(finalShutDownReason, this)); try { @@ -236,12 +262,12 @@ public class Connection { log.trace("SocketException at shutdown might be expected " + e.getMessage()); } catch (IOException e) { e.printStackTrace(); + log.error("Exception at shutdown. " + e.getMessage()); } finally { MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS); log.debug("Connection shutdown complete " + this.toString()); - // dont use executorService as its shut down but call handler on own thread - // to not get interrupted by caller + // keep UserThread.execute as its not clear if that is called from a non-UserThread if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler); } @@ -273,7 +299,7 @@ public class Connection { return "Connection{" + "portInfo=" + portInfo + ", uid='" + uid + '\'' + - ", objectId='" + getObjectId() + '\'' + + ", objectId='" + objectId + '\'' + ", sharedSpace=" + sharedSpace.toString() + ", peerAddress=" + peerAddress + ", isAuthenticated=" + isAuthenticated + @@ -283,21 +309,13 @@ public class Connection { '}'; } - public String getObjectId() { - return super.toString().split("@")[1]; - } - - public void setPeerAddress(@Nullable Address peerAddress) { - this.peerAddress = peerAddress; - } - - /////////////////////////////////////////////////////////////////////////////////////////// // SharedSpace /////////////////////////////////////////////////////////////////////////////////////////// /** * Holds all shared data between Connection and InputHandler + * Runs in same thread as Connection */ private static class SharedSpace { private static final Logger log = LoggerFactory.getLogger(SharedSpace.class); @@ -316,6 +334,7 @@ public class Connection { public SharedSpace(Connection connection, Socket socket, MessageListener messageListener, ConnectionListener connectionListener, boolean useCompression) { + Log.traceCall(); this.connection = connection; this.socket = socket; this.messageListener = messageListener; @@ -323,15 +342,18 @@ public class Connection { this.useCompression = useCompression; } - public void updateLastActivityDate() { + public synchronized void updateLastActivityDate() { + Log.traceCall(); lastActivityDate = new Date(); } - public Date getLastActivityDate() { + public synchronized Date getLastActivityDate() { + // Log.traceCall(); return lastActivityDate; } public void reportIllegalRequest(IllegalRequest illegalRequest) { + Log.traceCall(); log.warn("We got reported an illegal request " + illegalRequest); int prevCounter = illegalRequests.get(illegalRequest); if (prevCounter > illegalRequest.maxTolerance) { @@ -343,6 +365,7 @@ public class Connection { } public void handleConnectionException(Exception e) { + Log.traceCall(); if (e instanceof SocketException) { if (socket.isClosed()) shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; @@ -358,32 +381,50 @@ public class Connection { e.printStackTrace(); } - if (!stopped) + if (!stopped) { + stopped = true; connection.shutDown(false); + } } public void onMessage(Message message) { + Log.traceCall(); UserThread.execute(() -> messageListener.onMessage(message, connection)); } public boolean useCompression() { + Log.traceCall(); return useCompression; } public void shutDown(boolean sendCloseConnectionMessage) { + Log.traceCall(); connection.shutDown(sendCloseConnectionMessage); } - public ConnectionListener getConnectionListener() { + public synchronized ConnectionListener getConnectionListener() { + Log.traceCall(); return connectionListener; } - public Socket getSocket() { + public synchronized Socket getSocket() { + //Log.traceCall(); return socket; } public String getConnectionId() { - return connection.getObjectId(); + Log.traceCall(); + return connection.objectId; + } + + public void stop() { + Log.traceCall(); + this.stopped = true; + } + + public synchronized ConnectionListener.Reason getShutDownReason() { + Log.traceCall(); + return shutDownReason; } @Override @@ -396,13 +437,6 @@ public class Connection { '}'; } - public void stop() { - this.stopped = true; - } - - public ConnectionListener.Reason getShutDownReason() { - return shutDownReason; - } } @@ -410,6 +444,7 @@ public class Connection { // InputHandler /////////////////////////////////////////////////////////////////////////////////////////// + // Runs in same thread as Connection private static class InputHandler implements Runnable { private static final Logger log = LoggerFactory.getLogger(InputHandler.class); @@ -419,24 +454,27 @@ public class Connection { private volatile boolean stopped; public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) { + Log.traceCall(); this.sharedSpace = sharedSpace; this.objectInputStream = objectInputStream; this.portInfo = portInfo; } public void stop() { + Log.traceCall(); stopped = true; } @Override public void run() { + Log.traceCall(); try { Thread.currentThread().setName("InputHandler-" + portInfo); while (!stopped && !Thread.currentThread().isInterrupted()) { try { log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId()); Object rawInputObject = objectInputStream.readObject(); - log.trace("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId() + log.info("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId() + " rawInputObject " + rawInputObject); int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; @@ -468,18 +506,16 @@ public class Connection { if (message instanceof CloseConnectionMessage) { stopped = true; sharedSpace.shutDown(false); - } else { + } else if (!stopped) { sharedSpace.onMessage(message); } } else { sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); } } else { - log.error("Received decompressed data exceeds max. msg size."); sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } } else { - log.error("Received compressed data exceeds max. msg size."); sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } } catch (IOException | ClassNotFoundException e) { @@ -489,7 +525,8 @@ public class Connection { } } catch (Throwable t) { t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); + stopped = true; + sharedSpace.handleConnectionException(new Exception(t)); } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index 0377936866..aa277f5939 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -6,7 +6,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.TorNode; @@ -18,15 +20,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +// Run in UserThread public class LocalhostNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class); - private static int simulateTorDelayTorNode = 1 * 100; - private static int simulateTorDelayHiddenService = 1 * 100; + private static volatile int simulateTorDelayTorNode = 1 * 100; + private static volatile int simulateTorDelayHiddenService = 1 * 100; private Address address; public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { @@ -44,28 +46,32 @@ public class LocalhostNetworkNode extends NetworkNode { public LocalhostNetworkNode(int port) { super(port); + Log.traceCall(); } @Override public void start(@Nullable SetupListener setupListener) { + Log.traceCall(); if (setupListener != null) addSetupListener(setupListener); - createExecutor(); + createExecutorService(); //Tor delay simulation createTorNode(torNode -> { + Log.traceCall("torNode created"); setupListeners.stream().forEach(e -> e.onTorNodeReady()); // Create Hidden Service (takes about 40 sec.) createHiddenService(hiddenServiceDescriptor -> { + Log.traceCall("hiddenService created"); try { - startServer(new ServerSocket(port)); + startServer(new ServerSocket(servicePort)); } catch (IOException e) { e.printStackTrace(); + log.error("Exception at startServer: " + e.getMessage()); } - address = new Address("localhost", port); - + address = new Address("localhost", servicePort); setupListeners.stream().forEach(e -> e.onHiddenServicePublished()); }); @@ -76,21 +82,26 @@ public class LocalhostNetworkNode extends NetworkNode { @Override @Nullable public Address getAddress() { + Log.traceCall(); return address; } + // Called from NetworkNode thread @Override - protected Socket getSocket(Address peerAddress) throws IOException { + protected Socket createSocket(Address peerAddress) throws IOException { + Log.traceCall(); return new Socket(peerAddress.hostName, peerAddress.port); } + /////////////////////////////////////////////////////////////////////////////////////////// // Tor delay simulation /////////////////////////////////////////////////////////////////////////////////////////// private void createTorNode(final Consumer resultHandler) { + Log.traceCall(); ListenableFuture> future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000)); + Utilities.setThreadName("NetworkNode:CreateTorNode"); try { long ts = System.currentTimeMillis(); if (simulateTorDelayTorNode > 0) @@ -100,6 +111,7 @@ public class LocalhostNetworkNode extends NetworkNode { "TorNode created [simulation]:" + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + "\n############################################################\n"); + // as we are simulating we return null return null; } catch (Throwable t) { throw t; @@ -107,18 +119,25 @@ public class LocalhostNetworkNode extends NetworkNode { }); Futures.addCallback(future, new FutureCallback>() { public void onSuccess(TorNode torNode) { - UserThread.execute(() -> resultHandler.accept(torNode)); + UserThread.execute(() -> { + // as we are simulating we return null + resultHandler.accept(null); + }); } public void onFailure(@NotNull Throwable throwable) { - log.error("[simulation] TorNode creation failed"); + UserThread.execute(() -> { + log.error("[simulation] TorNode creation failed. " + throwable.getMessage()); + throwable.printStackTrace(); + }); } }); } private void createHiddenService(final Consumer resultHandler) { + Log.traceCall(); ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000)); + Utilities.setThreadName("NetworkNode:CreateHiddenService"); try { long ts = System.currentTimeMillis(); if (simulateTorDelayHiddenService > 0) @@ -128,6 +147,7 @@ public class LocalhostNetworkNode extends NetworkNode { "Hidden service created [simulation]:" + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + "\n############################################################\n"); + // as we are simulating we return null return null; } catch (Throwable t) { throw t; @@ -135,13 +155,18 @@ public class LocalhostNetworkNode extends NetworkNode { }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { - UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor)); + UserThread.execute(() -> { + // as we are simulating we return null + resultHandler.accept(null); + }); } public void onFailure(@NotNull Throwable throwable) { - log.error("[simulation] Hidden service creation failed"); + UserThread.execute(() -> { + log.error("[simulation] Hidden service creation failed. " + throwable.getMessage()); + throwable.printStackTrace(); + }); } }); } - } diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index f96c7b1436..554b2c3502 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -1,6 +1,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.*; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; @@ -21,11 +22,12 @@ import java.util.concurrent.CopyOnWriteArraySet; import static com.google.common.base.Preconditions.checkNotNull; +// Run in UserThread public abstract class NetworkNode implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); - protected final int port; - private final CopyOnWriteArraySet outBoundConnections = new CopyOnWriteArraySet<>(); + protected final int servicePort; + private final CopyOnWriteArraySet inBoundConnections = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet connectionListeners = new CopyOnWriteArraySet<>(); @@ -34,13 +36,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private Server server; private volatile boolean shutDownInProgress; + // accessed from different threads + private final CopyOnWriteArraySet outBoundConnections = new CopyOnWriteArraySet<>(); + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public NetworkNode(int port) { - this.port = port; + public NetworkNode(int servicePort) { + Log.traceCall(); + this.servicePort = servicePort; } /////////////////////////////////////////////////////////////////////////////////////////// @@ -48,16 +54,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// public void start() { + Log.traceCall(); start(null); } abstract public void start(@Nullable SetupListener setupListener); public SettableFuture sendMessage(@NotNull Address peerAddress, Message message) { - log.trace("sendMessage message=" + message); + Log.traceCall("message: " + message + " to peerAddress: " + peerAddress); checkNotNull(peerAddress, "peerAddress must not be null"); - Optional outboundConnectionOptional = findOutboundConnection(peerAddress); + Optional outboundConnectionOptional = lookupOutboundConnection(peerAddress); Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; if (connection != null) log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid()); @@ -69,7 +76,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } if (connection == null) { - Optional inboundConnectionOptional = findInboundConnection(peerAddress); + Optional inboundConnectionOptional = lookupInboundConnection(peerAddress); if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); if (connection != null) log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); @@ -78,28 +85,28 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener if (connection != null) { return sendMessage(connection, message); } else { + log.trace("We have not found any connection for that peerAddress. " + + "We will create a new outbound connection."); + final SettableFuture resultFuture = SettableFuture.create(); ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-create-new-outbound-connection-to-" + peerAddress); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress); try { - Connection newConnection; - log.trace("We have not found any connection for that peerAddress. " + - "We will create a new outbound connection."); - Socket socket = getSocket(peerAddress); // can take a while when using tor - newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this); + Socket socket = createSocket(peerAddress); // can take a while when using tor + Connection newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this); newConnection.setPeerAddress(peerAddress); outBoundConnections.add(newConnection); log.info("\n\n############################################################\n" + "NetworkNode created new outbound connection:" - + "\npeerAddress=" + peerAddress.getFullAddress() + + "\npeerAddress=" + peerAddress + "\nconnection.uid=" + newConnection.getUid() + "\nmessage=" + message + "\n############################################################\n"); - + newConnection.sendMessage(message); - return newConnection; + return newConnection; // can take a while when using tor } catch (Throwable throwable) { if (!(throwable instanceof ConnectException || throwable instanceof IOException)) { throwable.printStackTrace(); @@ -110,11 +117,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { - UserThread.execute(() -> resultFuture.set(connection)); + UserThread.execute(() -> { + resultFuture.set(connection); + }); } public void onFailure(@NotNull Throwable throwable) { - UserThread.execute(() -> resultFuture.setException(throwable)); + UserThread.execute(() -> { + resultFuture.setException(throwable); + }); } }); return resultFuture; @@ -122,9 +133,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } public SettableFuture sendMessage(Connection connection, Message message) { + Log.traceCall(); // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId()); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.objectId); try { log.debug("## connection.sendMessage"); connection.sendMessage(message); @@ -136,25 +148,29 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { - log.debug("## connection.sendMessage onSuccess"); - UserThread.execute(() -> resultFuture.set(connection)); + UserThread.execute(() -> { + resultFuture.set(connection); + }); } public void onFailure(@NotNull Throwable throwable) { - log.debug("## connection.sendMessage onFailure"); - UserThread.execute(() -> resultFuture.setException(throwable)); + UserThread.execute(() -> { + resultFuture.setException(throwable); + }); } }); return resultFuture; } public Set getAllConnections() { + Log.traceCall(); Set set = new HashSet<>(inBoundConnections); set.addAll(outBoundConnections); return set; } public void shutDown(Runnable shutDownCompleteHandler) { + Log.traceCall(); log.info("Shutdown NetworkNode"); if (!shutDownInProgress) { shutDownInProgress = true; @@ -166,7 +182,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener getAllConnections().stream().forEach(e -> e.shutDown()); log.info("NetworkNode shutdown complete"); - if (shutDownCompleteHandler != null) UserThread.execute(() -> shutDownCompleteHandler.run()); + if (shutDownCompleteHandler != null) shutDownCompleteHandler.run(); } } @@ -176,6 +192,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// public void addSetupListener(SetupListener setupListener) { + Log.traceCall(); setupListeners.add(setupListener); } @@ -185,38 +202,43 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// public void addConnectionListener(ConnectionListener connectionListener) { + Log.traceCall(); connectionListeners.add(connectionListener); } public void removeConnectionListener(ConnectionListener connectionListener) { + Log.traceCall(); connectionListeners.remove(connectionListener); } @Override public void onConnection(Connection connection) { - connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onConnection(connection))); + Log.traceCall(); + connectionListeners.stream().forEach(e -> e.onConnection(connection)); } @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - log.trace("onAuthenticationComplete peerAddress=" + peerAddress); - log.trace("onAuthenticationComplete connection=" + connection); + Log.traceCall(); + log.trace("onAuthenticationComplete peerAddress/connection: " + peerAddress + " / " + connection); - connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onPeerAddressAuthenticated(peerAddress, connection))); + connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); } @Override public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); Address peerAddress = connection.getPeerAddress(); log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress); outBoundConnections.remove(connection); inBoundConnections.remove(connection); - connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onDisconnect(reason, connection))); + connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); } @Override public void onError(Throwable throwable) { - connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onError(throwable))); + Log.traceCall(); + connectionListeners.stream().forEach(e -> e.onError(throwable)); } @@ -225,16 +247,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// public void addMessageListener(MessageListener messageListener) { + Log.traceCall(); messageListeners.add(messageListener); } public void removeMessageListener(MessageListener messageListener) { + Log.traceCall(); messageListeners.remove(messageListener); } @Override public void onMessage(Message message, Connection connection) { - messageListeners.stream().forEach(e -> UserThread.execute(() -> e.onMessage(message, connection))); + Log.traceCall(); + messageListeners.stream().forEach(e -> e.onMessage(message, connection)); } @@ -242,16 +267,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // Protected /////////////////////////////////////////////////////////////////////////////////////////// - protected void createExecutor() { - executorService = Utilities.getListeningExecutorService("NetworkNode-" + port, 20, 50, 120L); + protected void createExecutorService() { + Log.traceCall(); + executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L); } protected void startServer(ServerSocket serverSocket) { + Log.traceCall(); server = new Server(serverSocket, (message, connection) -> NetworkNode.this.onMessage(message, connection), new ConnectionListener() { @Override public void onConnection(Connection connection) { + Log.traceCall(); // we still have not authenticated so put it to the temp list inBoundConnections.add(connection); NetworkNode.this.onConnection(connection); @@ -259,36 +287,42 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); } @Override public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); Address peerAddress = connection.getPeerAddress(); - log.trace("onDisconnect at incoming connection to peerAddress " + peerAddress); + log.trace("onDisconnect at incoming connection to peerAddress (or connection) " + + ((peerAddress == null) ? connection : peerAddress)); inBoundConnections.remove(connection); NetworkNode.this.onDisconnect(reason, connection); } @Override public void onError(Throwable throwable) { + Log.traceCall(); NetworkNode.this.onError(throwable); } }); executorService.submit(server); } - private Optional findOutboundConnection(Address peerAddress) { + private Optional lookupOutboundConnection(Address peerAddress) { + Log.traceCall(); return outBoundConnections.stream() .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); } - private Optional findInboundConnection(Address peerAddress) { + private Optional lookupInboundConnection(Address peerAddress) { + Log.traceCall(); return inBoundConnections.stream() .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); } - abstract protected Socket getSocket(Address peerAddress) throws IOException; + abstract protected Socket createSocket(Address peerAddress) throws IOException; @Nullable abstract public Address getAddress(); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 506e0e2ad0..73acf79da1 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -1,5 +1,6 @@ package io.bitsquare.p2p.network; +import io.bitsquare.app.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -7,20 +8,24 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +// Runs in UserThread class Server implements Runnable { private static final Logger log = LoggerFactory.getLogger(Server.class); - private final ServerSocket serverSocket; private final MessageListener messageListener; private final ConnectionListener connectionListener; - private final Set connections = new HashSet<>(); + + // accessed from different threads + private final ServerSocket serverSocket; + private final Set connections = new CopyOnWriteArraySet<>(); private volatile boolean stopped; public Server(ServerSocket serverSocket, MessageListener messageListener, ConnectionListener connectionListener) { + Log.traceCall(); this.serverSocket = serverSocket; this.messageListener = messageListener; this.connectionListener = connectionListener; @@ -28,14 +33,15 @@ class Server implements Runnable { @Override public void run() { + Log.traceCall(); try { // Thread created by NetworkNode - Thread.currentThread().setName("NetworkNode:Server-" + serverSocket.getLocalPort()); + Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); try { while (!stopped && !Thread.currentThread().isInterrupted()) { log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); final Socket socket = serverSocket.accept(); - if (!stopped) { + if (!stopped && !Thread.currentThread().isInterrupted()) { log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort()); Connection connection = new Connection(socket, messageListener, connectionListener); @@ -61,6 +67,7 @@ class Server implements Runnable { } public void shutDown() { + Log.traceCall(); if (!stopped) { stopped = true; diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index e2b626f64b..1fb275d0f7 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -6,7 +6,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Utils; import io.nucleo.net.HiddenServiceDescriptor; @@ -19,29 +21,24 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.Socket; -import java.util.Random; import java.util.Timer; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; +// Run in UserThread public class TorNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class); - private static final Random random = new Random(); - private static final long TIMEOUT = 5000; - private static final int MAX_ERRORS_BEFORE_RESTART = 3; private static final int MAX_RESTART_ATTEMPTS = 3; private static final int WAIT_BEFORE_RESTART = 2000; private static final long SHUT_DOWN_TIMEOUT = 5000; private final File torDir; - private TorNode torNode; + private TorNode torNetworkNode; private HiddenServiceDescriptor hiddenServiceDescriptor; private Timer shutDownTimeoutTimer; - private long nonce; - private int errorCounter; private int restartCounter; private Runnable shutDownCompleteHandler; private boolean torShutDownComplete, networkNodeShutDownDoneComplete; @@ -53,7 +50,7 @@ public class TorNetworkNode extends NetworkNode { public TorNetworkNode(int servicePort, File torDir) { super(servicePort); - + Log.traceCall(); this.torDir = torDir; } @@ -64,39 +61,56 @@ public class TorNetworkNode extends NetworkNode { @Override public void start(@Nullable SetupListener setupListener) { + Log.traceCall(); if (setupListener != null) addSetupListener(setupListener); - createExecutor(); + createExecutorService(); // Create the tor node (takes about 6 sec.) createTorNode(torDir, torNode -> { - TorNetworkNode.this.torNode = torNode; + Log.traceCall("torNode created"); + TorNetworkNode.this.torNetworkNode = torNode; - setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onTorNodeReady())); + setupListeners.stream().forEach(e -> e.onTorNodeReady()); // Create Hidden Service (takes about 40 sec.) - createHiddenService(torNode, Utils.findFreeSystemPort(), port, hiddenServiceDescriptor -> { - TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; + createHiddenService(torNode, + Utils.findFreeSystemPort(), + servicePort, + hiddenServiceDescriptor -> { + Log.traceCall("hiddenService created"); + TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor; - startServer(hiddenServiceDescriptor.getServerSocket()); - UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()), - 500, TimeUnit.MILLISECONDS); - }); + startServer(hiddenServiceDescriptor.getServerSocket()); + setupListeners.stream().forEach(e -> e.onHiddenServicePublished()); + /* UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()), + 500, TimeUnit.MILLISECONDS);*/ + }); }); } @Override @Nullable public Address getAddress() { + Log.traceCall(); if (hiddenServiceDescriptor != null) return new Address(hiddenServiceDescriptor.getFullAddress()); else return null; } + @Override + protected Socket createSocket(Address peerAddress) throws IOException { + Log.traceCall(); + checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address"); + + return torNetworkNode.connectToHiddenService(peerAddress.hostName, peerAddress.port); + } + + //TODO simplify public void shutDown(Runnable shutDownCompleteHandler) { - log.info("Shutdown TorNetworkNode"); + Log.traceCall(); this.shutDownCompleteHandler = shutDownCompleteHandler; shutDownTimeoutTimer = UserThread.runAfter(() -> { @@ -105,41 +119,41 @@ public class TorNetworkNode extends NetworkNode { }, SHUT_DOWN_TIMEOUT, TimeUnit.MILLISECONDS); if (executorService != null) { - executorService.submit(() -> super.shutDown(() -> { + executorService.submit(() -> { + Utilities.setThreadName("TorNetworkNodeShutDownSuperClass"); + UserThread.execute(() -> { + // We want to stay in UserThread + super.shutDown(() -> { networkNodeShutDownDoneComplete = true; if (torShutDownComplete) shutDownExecutorService(); - } - )); + }); + }); + }); } else { log.error("executorService must not be null at shutDown"); } - ListenableFuture future2 = executorService.submit(() -> { + executorService.submit(() -> { + Utilities.setThreadName("NetworkNode:torNodeShutdown"); try { long ts = System.currentTimeMillis(); log.info("Shutdown torNode"); - if (torNode != null) - torNode.shutdown(); + // Might take a bit so we use a thread + if (torNetworkNode != null) + torNetworkNode.shutdown(); log.info("Shutdown torNode done after " + (System.currentTimeMillis() - ts) + " ms."); + UserThread.execute(() -> { + torShutDownComplete = true; + if (networkNodeShutDownDoneComplete) + shutDownExecutorService(); + }); } catch (Throwable e) { - e.printStackTrace(); - log.error("Shutdown torNode failed with exception: " + e.getMessage()); - shutDownExecutorService(); - } - }); - Futures.addCallback(future2, new FutureCallback() { - @Override - public void onSuccess(Object o) { - torShutDownComplete = true; - if (networkNodeShutDownDoneComplete) + UserThread.execute(() -> { + e.printStackTrace(); + log.error("Shutdown torNode failed with exception: " + e.getMessage()); + // We want to switch to UserThread shutDownExecutorService(); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - throwable.printStackTrace(); - log.error("Shutdown torNode failed with exception: " + throwable.getMessage()); - shutDownExecutorService(); + }); } }); } @@ -149,26 +163,27 @@ public class TorNetworkNode extends NetworkNode { /////////////////////////////////////////////////////////////////////////////////////////// private void shutDownExecutorService() { + Log.traceCall(); shutDownTimeoutTimer.cancel(); new Thread(() -> { - Thread.currentThread().setName("NetworkNode:shutDownExecutorService-" + new Random().nextInt(1000)); + Utilities.setThreadName("NetworkNode:shutDownExecutorService"); try { long ts = System.currentTimeMillis(); - log.info("Shutdown executorService"); + log.debug("Shutdown executorService"); MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS); - log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms."); - + log.debug("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms."); log.info("Shutdown completed"); - UserThread.execute(() -> shutDownCompleteHandler.run()); + shutDownCompleteHandler.run(); } catch (Throwable t) { t.printStackTrace(); log.error("Shutdown executorService failed with exception: " + t.getMessage()); - UserThread.execute(() -> shutDownCompleteHandler.run()); + shutDownCompleteHandler.run(); } }).start(); } private void restartTor() { + Log.traceCall(); restartCounter++; if (restartCounter <= MAX_RESTART_ATTEMPTS) { shutDown(() -> UserThread.runAfter(() -> { @@ -189,8 +204,9 @@ public class TorNetworkNode extends NetworkNode { /////////////////////////////////////////////////////////////////////////////////////////// private void createTorNode(final File torDir, final Consumer resultHandler) { + Log.traceCall(); ListenableFuture> future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000)); + Utilities.setThreadName("TorNetworkNode:CreateTorNode"); try { long ts = System.currentTimeMillis(); if (torDir.mkdirs()) @@ -198,34 +214,41 @@ public class TorNetworkNode extends NetworkNode { log.info("TorDir = " + torDir.getAbsolutePath()); log.trace("Create TorNode"); - TorNode torNode1 = new TorNode( - torDir) { - }; + TorNode torNode = + new TorNode(torDir) { + }; log.info("\n\n############################################################\n" + "TorNode created:" + "\nTook " + (System.currentTimeMillis() - ts) + " ms" + "\n############################################################\n"); - return torNode1; + return torNode; } catch (Throwable t) { throw t; } }); Futures.addCallback(future, new FutureCallback>() { public void onSuccess(TorNode torNode) { - resultHandler.accept(torNode); + Log.traceCall(); + UserThread.execute(() -> { + resultHandler.accept(torNode); + }); } public void onFailure(@NotNull Throwable throwable) { - log.error("TorNode creation failed with exception: " + throwable.getMessage()); - restartTor(); + Log.traceCall(); + UserThread.execute(() -> { + log.error("TorNode creation failed with exception: " + throwable.getMessage()); + restartTor(); + }); } }); } private void createHiddenService(TorNode torNode, int localPort, int servicePort, Consumer resultHandler) { + Log.traceCall(); ListenableFuture future = executorService.submit(() -> { - Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000)); + Utilities.setThreadName("TorNetworkNode:CreateHiddenService"); try { long ts = System.currentTimeMillis(); log.debug("Create hidden service"); @@ -243,23 +266,17 @@ public class TorNetworkNode extends NetworkNode { }); Futures.addCallback(future, new FutureCallback() { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { - resultHandler.accept(hiddenServiceDescriptor); + UserThread.execute(() -> { + resultHandler.accept(hiddenServiceDescriptor); + }); } public void onFailure(@NotNull Throwable throwable) { - log.error("Hidden service creation failed"); - restartTor(); + UserThread.execute(() -> { + log.error("Hidden service creation failed"); + restartTor(); + }); } }); } - - - @Override - protected Socket getSocket(Address peerAddress) throws IOException { - checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address"); - - return torNode.connectToHiddenService(peerAddress.hostName, peerAddress.port); - } - - } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index 0ee5f60a10..b5192d773f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -4,7 +4,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import io.bitsquare.common.UserThread; +import io.bitsquare.app.Log; import io.bitsquare.common.util.Tuple2; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; @@ -17,7 +17,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.TimeUnit; // authentication example: @@ -43,6 +42,7 @@ public class AuthenticationHandshake { private MessageListener messageListener; public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) { + Log.traceCall(); this.networkNode = networkNode; this.peerGroup = peerGroup; this.myAddress = myAddress; @@ -51,21 +51,25 @@ public class AuthenticationHandshake { } private void onFault(@NotNull Throwable throwable) { + Log.traceCall(); cleanup(); - UserThread.execute(() -> resultFuture.setException(throwable)); + resultFuture.setException(throwable); } private void onSuccess(Connection connection) { + Log.traceCall(); cleanup(); - UserThread.execute(() -> resultFuture.set(connection)); + resultFuture.set(connection); } private void cleanup() { + Log.traceCall(); stopped = true; networkNode.removeMessageListener(messageListener); } public SettableFuture requestAuthenticationToPeer(Address peerAddress) { + Log.traceCall(); // Requesting peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); @@ -88,6 +92,7 @@ public class AuthenticationHandshake { } public SettableFuture requestAuthentication(Set
remainingAddresses, Address peerAddress) { + Log.traceCall(); // Requesting peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); @@ -113,6 +118,7 @@ public class AuthenticationHandshake { } public SettableFuture processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) { + Log.traceCall(); // Responding peer resultFuture = SettableFuture.create(); startAuthTs = System.currentTimeMillis(); @@ -121,7 +127,31 @@ public class AuthenticationHandshake { log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); log.info("We shut down inbound connection from peer {} to establish a new " + "connection with his reported address.", peerAddress); - connection.shutDown(() -> UserThread.runAfter(() -> { + + //TODO check if causes problems without delay + connection.shutDown(() -> { + Log.traceCall(); + if (!stopped) { + // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to + // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) + log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress); + + SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce())); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("onSuccess sending ChallengeMessage"); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.warn("onFailure sending ChallengeMessage."); + onFault(throwable); + } + }); + } + }); + /* connection.shutDown(() -> UserThread.runAfter(() -> { Log.traceCall(); if (!stopped) { // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // inconsistent state (removal of connection from NetworkNode.authenticatedConnections) @@ -130,12 +160,12 @@ public class AuthenticationHandshake { SettableFuture future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce())); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(Connection connection) { + public void onSuccess(Connection connection) { Log.traceCall(); log.trace("onSuccess sending ChallengeMessage"); } @Override - public void onFailure(@NotNull Throwable throwable) { + public void onFailure(@NotNull Throwable throwable) { Log.traceCall(); log.warn("onFailure sending ChallengeMessage."); onFault(throwable); } @@ -143,13 +173,15 @@ public class AuthenticationHandshake { } }, 100 + PeerGroup.simulateAuthTorNode, - TimeUnit.MILLISECONDS)); + TimeUnit.MILLISECONDS));*/ return resultFuture; } private void setupMessageListener() { + Log.traceCall(); messageListener = (message, connection) -> { + Log.traceCall(); if (message instanceof AuthenticationMessage) { if (message instanceof AuthenticationResponse) { // Requesting peer @@ -207,7 +239,7 @@ public class AuthenticationHandshake { }); log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress - + " authenticated (" + connection.getObjectId() + "). Took " + + " authenticated (" + connection.objectId + "). Took " + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); onSuccess(connection); @@ -227,7 +259,7 @@ public class AuthenticationHandshake { // we wait until the handshake is completed before setting the authenticate flag // authentication at both sides of the connection log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress - + " authenticated (" + connection.getObjectId() + "). Took " + + " authenticated (" + connection.objectId + "). Took " + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); onSuccess(connection); @@ -239,6 +271,7 @@ public class AuthenticationHandshake { } private void authenticateToNextRandomPeer(Set
remainingAddresses) { + Log.traceCall(); Optional>> tupleOptional = getRandomAddressAndRemainingSet(remainingAddresses); if (tupleOptional.isPresent()) { Tuple2> tuple = tupleOptional.get(); @@ -250,6 +283,7 @@ public class AuthenticationHandshake { } private Optional>> getRandomAddressAndRemainingSet(Set
addresses) { + Log.traceCall(); if (!addresses.isEmpty()) { List
list = new ArrayList<>(addresses); Collections.shuffle(list); @@ -261,6 +295,7 @@ public class AuthenticationHandshake { } private long getAndSetNonce() { + Log.traceCall(); nonce = new Random().nextLong(); while (nonce == 0) nonce = getAndSetNonce(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index 5dce12d4b0..7f2a255d67 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -3,8 +3,10 @@ package io.bitsquare.p2p.peers; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Tuple2; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.ConnectionListener; @@ -19,14 +21,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; - +// Run in UserThread public class PeerGroup { private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); @@ -42,21 +42,21 @@ public class PeerGroup { MAX_CONNECTIONS = maxConnections; } - private static final int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. - private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min. + private static final int SEND_PING_INTERVAL = 100000000;// new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min. + private static final int GET_PEERS_INTERVAL = 100000000;//new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min. private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000; - + private static final int MAX_REPORTED_PEERS = 1000; private final NetworkNode networkNode; private final Set
seedNodeAddresses; - private final CopyOnWriteArraySet peerListeners = new CopyOnWriteArraySet<>(); - private final ConcurrentHashMap authenticatedPeers = new ConcurrentHashMap<>(); - private final CopyOnWriteArraySet
reportedPeerAddresses = new CopyOnWriteArraySet<>(); - private final Timer maintenanceTimer = new Timer(); + private final Set peerListeners = new HashSet<>(); + private final Map authenticatedPeers = new HashMap<>(); + private final Set
reportedPeerAddresses = new HashSet<>(); + private final Timer sendPingTimer = new Timer(); private final Timer getPeersTimer = new Timer(); - private volatile boolean shutDownInProgress; + private boolean shutDownInProgress; private boolean firstPeerAdded = false; @@ -65,6 +65,8 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// public PeerGroup(NetworkNode networkNode, Set
seeds) { + Log.traceCall(); + this.networkNode = networkNode; this.seedNodeAddresses = seeds; @@ -72,6 +74,7 @@ public class PeerGroup { } private void init(NetworkNode networkNode) { + Log.traceCall(); networkNode.addMessageListener((message, connection) -> { if (message instanceof MaintenanceMessage) processMaintenanceMessage((MaintenanceMessage) message, connection); @@ -83,16 +86,18 @@ public class PeerGroup { networkNode.addConnectionListener(new ConnectionListener() { @Override public void onConnection(Connection connection) { + Log.traceCall(); } @Override public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); } @Override public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); log.debug("onDisconnect connection=" + connection + " / reason=" + reason); - log.debug("##### onDisconnect connection.isAuthenticated()=" + connection.isAuthenticated()); // only removes authenticated nodes if (connection.isAuthenticated()) removePeer(connection.getPeerAddress()); @@ -100,6 +105,7 @@ public class PeerGroup { @Override public void onError(Throwable throwable) { + Log.traceCall(); } }); @@ -112,21 +118,23 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) { + Log.traceCall(); seedNodeAddresses.remove(mySeedNodeAddress); } public void shutDown() { + Log.traceCall(); if (!shutDownInProgress) { shutDownInProgress = true; - if (maintenanceTimer != null) - maintenanceTimer.cancel(); + if (sendPingTimer != null) + sendPingTimer.cancel(); } } public void broadcast(BroadcastMessage message, @Nullable Address sender) { + Log.traceCall(); log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers."); log.trace("message = " + message); - printAuthenticatedPeers(); // TODO add randomized timing? authenticatedPeers.values().stream() @@ -155,16 +163,15 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) { + Log.traceCall(); AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress()); SettableFuture future = authenticationHandshake.processAuthenticationRequest(message, connection); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable Connection connection) { if (connection != null) { - UserThread.execute(() -> { - setAuthenticated(connection, connection.getPeerAddress()); - purgeReportedPeers(); - }); + setAuthenticated(connection, connection.getPeerAddress()); + purgeReportedPeersIfExceeds(); } } @@ -172,12 +179,13 @@ public class PeerGroup { public void onFailure(@NotNull Throwable throwable) { throwable.printStackTrace(); log.error("AuthenticationHandshake failed. " + throwable.getMessage()); - UserThread.execute(() -> removePeer(connection.getPeerAddress())); + removePeer(connection.getPeerAddress()); } }); } public void authenticateSeedNode(Address peerAddress) { + Log.traceCall(); authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true); } @@ -185,6 +193,7 @@ public class PeerGroup { // After connection is authenticated, we try to connect to any reported peer as long we have not // reached our max connection size. private void authenticateToSeedNode(Set
remainingAddresses, Address peerAddress, boolean continueOnSuccess) { + Log.traceCall(); checkArgument(!authenticatedPeers.containsKey(peerAddress), "We have that peer already authenticated. That must never happen."); @@ -232,6 +241,7 @@ public class PeerGroup { } private void authenticateToRemainingReportedPeers() { + Log.traceCall(); Optional>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses); if (tupleOptional.isPresent()) { log.info("We try to authenticate to a random peer. " + tupleOptional.get().first); @@ -245,6 +255,7 @@ public class PeerGroup { // We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed. // If we succeed we repeat until we are ut of addresses. private void authenticateToReportedPeer(Set
remainingAddresses, Address peerAddress) { + Log.traceCall(); checkArgument(!authenticatedPeers.containsKey(peerAddress), "We have that peer already authenticated. That must never happen."); @@ -283,6 +294,7 @@ public class PeerGroup { } private void authenticateToRemainingSeedNodes() { + Log.traceCall(); Optional>> tupleOptional = getRandomItemAndRemainingSet(seedNodeAddresses); if (tupleOptional.isPresent()) { log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first); @@ -300,6 +312,7 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// public void authenticateToPeer(Address peerAddress, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) { + Log.traceCall(); checkArgument(!authenticatedPeers.containsKey(peerAddress), "We have that seed node already authenticated. That must never happen."); @@ -327,9 +340,10 @@ public class PeerGroup { } private void setAuthenticated(Connection connection, Address peerAddress) { + Log.traceCall(); log.info("\n\n############################################################\n" + "We are authenticated to:" + - "\nconnection=" + connection + "\nconnection=" + connection.getUid() + "\nmyAddress=" + getMyAddress() + "\npeerAddress= " + peerAddress + "\n############################################################\n"); @@ -342,19 +356,18 @@ public class PeerGroup { } private void addAuthenticatedPeer(Peer peer) { + Log.traceCall(); authenticatedPeers.put(peer.address, peer); reportedPeerAddresses.remove(peer.address); firstPeerAdded = !firstPeerAdded && authenticatedPeers.size() == 1; - UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer))); + peerListeners.stream().forEach(e -> e.onPeerAdded(peer)); if (firstPeerAdded) - UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer))); + peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer)); - if (authenticatedPeers.size() > MAX_CONNECTIONS) - disconnectOldConnections(); - - printAuthenticatedPeers(); + if (!checkIfConnectedPeersExceeds()) + printAuthenticatedPeers(); } /////////////////////////////////////////////////////////////////////////////////////////// @@ -362,13 +375,14 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private void setupMaintenanceTimer() { - maintenanceTimer.scheduleAtFixedRate(new TimerTask() { + Log.traceCall(); + sendPingTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000)); + Utilities.setThreadName("MaintenanceTimer"); try { UserThread.execute(() -> { - disconnectOldConnections(); + checkIfConnectedPeersExceeds(); pingPeers(); }); } catch (Throwable t) { @@ -376,14 +390,14 @@ public class PeerGroup { log.error("Executing task failed. " + t.getMessage()); } } - }, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL); + }, SEND_PING_INTERVAL, SEND_PING_INTERVAL); getPeersTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - Thread.currentThread().setName("GetPeersTimer-" + new Random().nextInt(1000)); + Utilities.setThreadName("GetPeersTimer"); try { - UserThread.execute(() -> sendGetPeersRequest()); + UserThread.execute(() -> trySendGetPeersRequest()); } catch (Throwable t) { t.printStackTrace(); log.error("Executing task failed. " + t.getMessage()); @@ -393,21 +407,28 @@ public class PeerGroup { } - private void disconnectOldConnections() { - List authenticatedConnections = networkNode.getAllConnections().stream() - .filter(e -> e.isAuthenticated()) - .collect(Collectors.toList()); - if (authenticatedConnections.size() > MAX_CONNECTIONS) { + private boolean checkIfConnectedPeersExceeds() { + Log.traceCall(); + if (authenticatedPeers.size() > MAX_CONNECTIONS) { + log.trace("We have too many connections open. Lets remove the one which was not active recently."); + List authenticatedConnections = networkNode.getAllConnections().stream() + .filter(e -> e.isAuthenticated()) + .collect(Collectors.toList()); authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate())); log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size()); Connection connection = authenticatedConnections.remove(0); - log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection); - connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS)); + log.info("We had shut down the oldest connection with last activity date=" + + connection.getLastActivityDate() + " / connection=" + connection); + connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS)); + return true; + } else { + log.trace("We don't have too many connections open."); + return false; } } private void pingPeers() { - log.trace("pingPeers"); + Log.traceCall(); Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); connectedPeersList.stream() .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) @@ -428,29 +449,35 @@ public class PeerGroup { }, 5, 10)); } - private void sendGetPeersRequest() { - log.trace("sendGetPeersRequest"); - Set connectedPeersList = new HashSet<>(authenticatedPeers.values()); - connectedPeersList.stream() - .forEach(e -> UserThread.runAfterRandomDelay(() -> { - SettableFuture future = networkNode.sendMessage(e.connection, - new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses()))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("sendGetPeersRequest sent successfully"); - } + private void trySendGetPeersRequest() { + Log.traceCall(); + Collection peers = authenticatedPeers.values(); + if (!peers.isEmpty()) { + Set connectedPeersList = new HashSet<>(peers); + connectedPeersList.stream() + .forEach(e -> UserThread.runAfterRandomDelay(() -> { + SettableFuture future = networkNode.sendMessage(e.connection, + new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("sendGetPeersRequest sent successfully"); + } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("sendGetPeersRequest sending failed " + throwable.getMessage()); - removePeer(e.address); - } - }); - }, 5, 10)); + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("sendGetPeersRequest sending failed " + throwable.getMessage()); + removePeer(e.address); + } + }); + }, 5, 10)); + } else { + log.debug("No peers available for requesting."); + } } private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) { + Log.traceCall(); log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress()); if (message instanceof PingMessage) { SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); @@ -471,8 +498,8 @@ public class PeerGroup { Peer peer = authenticatedPeers.get(connection.getPeerAddress()); if (peer != null) { if (((PongMessage) message).nonce != peer.getPingNonce()) { - removePeer(peer.address); log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress()); + removePeer(peer.address); } } } @@ -510,18 +537,22 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// public void addMessageListener(MessageListener messageListener) { + Log.traceCall(); networkNode.addMessageListener(messageListener); } public void removeMessageListener(MessageListener messageListener) { + Log.traceCall(); networkNode.removeMessageListener(messageListener); } public void addPeerListener(PeerListener peerListener) { + Log.traceCall(); peerListeners.add(peerListener); } public void removePeerListener(PeerListener peerListener) { + Log.traceCall(); peerListeners.remove(peerListener); } @@ -531,17 +562,20 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private Map getAuthenticatedPeers() { + Log.traceCall(); return authenticatedPeers; } public Set
getAllPeerAddresses() { - CopyOnWriteArraySet
allPeerAddresses = new CopyOnWriteArraySet<>(reportedPeerAddresses); + Log.traceCall(); + Set
allPeerAddresses = new HashSet<>(reportedPeerAddresses); allPeerAddresses.addAll(authenticatedPeers.values().stream() - .map(e -> e.address).collect(Collectors.toList())); + .map(e -> e.address).collect(Collectors.toSet())); return allPeerAddresses; } public Set
getSeedNodeAddresses() { + Log.traceCall(); return seedNodeAddresses; } @@ -551,7 +585,7 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// void addToReportedPeers(HashSet
peerAddresses, Connection connection) { - log.trace("addToReportedPeers"); + Log.traceCall(); // we disconnect misbehaving nodes trying to send too many peers // reported peers include the peers connected peers which is normally max. 8 but we give some headroom // for safety @@ -560,24 +594,29 @@ public class PeerGroup { } else { peerAddresses.remove(getMyAddress()); reportedPeerAddresses.addAll(peerAddresses); - purgeReportedPeers(); + purgeReportedPeersIfExceeds(); } } - private void purgeReportedPeers() { - log.trace("purgeReportedPeers"); - int all = getAllPeerAddresses().size(); - if (all > 1000) { - int diff = all - 100; + private void purgeReportedPeersIfExceeds() { + Log.traceCall(); + int size = reportedPeerAddresses.size(); + if (size > MAX_REPORTED_PEERS) { + log.trace("We have more then {} reported peers. size={}. " + + "We remove random peers from the reported peers list.", MAX_REPORTED_PEERS, size); + int diff = size - MAX_REPORTED_PEERS; List
list = new LinkedList<>(getReportedNotConnectedPeerAddresses()); for (int i = 0; i < diff; i++) { Address toRemove = getAndRemoveRandomItem(list); reportedPeerAddresses.remove(toRemove); } + } else { + log.trace("We don't have more then {} reported peers yet.", MAX_REPORTED_PEERS); } } private Set
getReportedNotConnectedPeerAddresses() { + Log.traceCall(); Set
set = new HashSet<>(reportedPeerAddresses); authenticatedPeers.values().stream().forEach(e -> set.remove(e.address)); return set; @@ -589,18 +628,20 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private void removePeer(@Nullable Address peerAddress) { - reportedPeerAddresses.remove(peerAddress); - + Log.traceCall("peerAddress=" + peerAddress); if (peerAddress != null) { + boolean contained = reportedPeerAddresses.remove(peerAddress); Peer disconnectedPeer = authenticatedPeers.remove(peerAddress); if (disconnectedPeer != null) - UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress))); + peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)); + + if (contained || disconnectedPeer != null) + printAllPeers(); } - printAuthenticatedPeers(); - printReportedPeers(); } private Address getMyAddress() { + Log.traceCall(); return networkNode.getAddress(); } @@ -610,10 +651,12 @@ public class PeerGroup { /////////////////////////////////////////////////////////////////////////////////////////// private Address getAndRemoveRandomItem(List
list) { + Log.traceCall(); return list.remove(new Random().nextInt(list.size())); } private Optional>> getRandomItemAndRemainingSet(Set
remainingAddresses) { + Log.traceCall(); List
list = new ArrayList<>(remainingAddresses); authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); if (!list.isEmpty()) { @@ -625,11 +668,13 @@ public class PeerGroup { } public void printAllPeers() { + Log.traceCall(); printAuthenticatedPeers(); printReportedPeers(); } public void printAuthenticatedPeers() { + Log.traceCall(); StringBuilder result = new StringBuilder("\n\n############################################################\n" + "Authenticated peers for node " + getMyAddress() + ":"); authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); @@ -638,6 +683,7 @@ public class PeerGroup { } public void printReportedPeers() { + Log.traceCall(); StringBuilder result = new StringBuilder("\n\n############################################################\n" + "Reported peers for node " + getMyAddress() + ":"); reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e)); diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index e0b37fc407..7a51d1611e 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -1,5 +1,6 @@ package io.bitsquare.p2p.seed; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.crypto.EncryptionService; @@ -29,6 +30,7 @@ public class SeedNode { private boolean stopped; public SeedNode() { + Log.traceCall(); } @@ -41,6 +43,7 @@ public class SeedNode { // eg. lmvdenjkyvx2ovga.onion:8001 20 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003 // or when using localhost: localhost:8001 20 true localhost:8002|localhost:8003 public void processArgs(String[] args) { + Log.traceCall(); if (args.length > 0) { String arg0 = args[0]; checkArgument(arg0.contains(":") && arg0.split(":").length == 2 && arg0.split(":")[1].length() == 4, "Wrong program argument"); @@ -78,6 +81,7 @@ public class SeedNode { } public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, Address mySeedNodeAddress, boolean useLocalhost, @Nullable Set
seedNodes, @Nullable P2PServiceListener listener) { + Log.traceCall(); SeedNodesRepository seedNodesRepository = new SeedNodesRepository(); if (seedNodes != null && !seedNodes.isEmpty()) { if (useLocalhost) @@ -92,14 +96,17 @@ public class SeedNode { } public P2PService getP2PService() { + Log.traceCall(); return p2PService; } public void shutDown() { + Log.traceCall(); shutDown(null); } public void shutDown(@Nullable Runnable shutDownCompleteHandler) { + Log.traceCall(); log.debug("Request shutdown seed node"); if (!stopped) { stopped = true; diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index ba51f95d64..cba08cb37d 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -1,10 +1,12 @@ package io.bitsquare.p2p.storage; import com.google.common.annotations.VisibleForTesting; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.CryptoException; import io.bitsquare.common.crypto.Hash; import io.bitsquare.common.crypto.Sig; +import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.IllegalRequest; import io.bitsquare.p2p.network.MessageListener; @@ -21,12 +23,12 @@ import java.math.BigInteger; import java.security.KeyPair; import java.security.PublicKey; import java.util.Map; -import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +// Run in UserThread public class ProtectedExpirableDataStorage { private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class); @@ -48,6 +50,7 @@ public class ProtectedExpirableDataStorage { /////////////////////////////////////////////////////////////////////////////////////////// public ProtectedExpirableDataStorage(PeerGroup peerGroup, File storageDir) { + Log.traceCall(); this.peerGroup = peerGroup; storage = new Storage<>(storageDir); @@ -56,12 +59,14 @@ public class ProtectedExpirableDataStorage { } private void init() { + Log.traceCall(); ConcurrentHashMap persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap"); if (persisted != null) { sequenceNumberMap = persisted; } addMessageListener((message, connection) -> { + Log.traceCall("onMessage: Message=" + message); if (message instanceof DataMessage) { if (connection.isAuthenticated()) { log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection); @@ -80,21 +85,26 @@ public class ProtectedExpirableDataStorage { } }); - TimerTask task = new TimerTask() { - @Override - public void run() { - Thread.currentThread().setName("RemoveExpiredEntriesTimer-" + new Random().nextInt(1000)); - try { - log.info("removeExpiredEntries called "); - map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) - .forEach(entry -> map.remove(entry.getKey())); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - } - }; - timer.scheduleAtFixedRate(task, CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + Utilities.setThreadName("RemoveExpiredEntriesTimer"); + UserThread.execute(() -> removeExpiredEntries()); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }, + CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL); + } + + private void removeExpiredEntries() { + Log.traceCall(); + map.entrySet().stream() + .filter(entry -> entry.getValue().isExpired()) + .forEach(entry -> map.remove(entry.getKey())); } @@ -103,6 +113,7 @@ public class ProtectedExpirableDataStorage { /////////////////////////////////////////////////////////////////////////////////////////// public void shutDown() { + Log.traceCall(); if (!shutDownInProgress) { shutDownInProgress = true; timer.cancel(); @@ -111,10 +122,12 @@ public class ProtectedExpirableDataStorage { } public void setAuthenticated() { + Log.traceCall(); this.authenticated = true; } public boolean add(ProtectedData protectedData, @Nullable Address sender) { + Log.traceCall(); BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload); boolean containsKey = map.containsKey(hashOfPayload); boolean result = checkPublicKeys(protectedData, true) @@ -128,13 +141,13 @@ public class ProtectedExpirableDataStorage { if (result) { map.put(hashOfPayload, protectedData); log.trace("Data added to our map and it will be broadcasted to our peers."); - UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData))); + hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)); - StringBuilder sb = new StringBuilder("\n\n############################################################\n" + - "Data set after addProtectedExpirableData:"); - map.values().stream().forEach(e -> sb.append("\n").append(e.toString())); + StringBuilder sb = new StringBuilder("\n\n############################################################\n"); + sb.append("Data set after addProtectedExpirableData:"); + map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n")); sb.append("\n############################################################\n"); - log.trace(sb.toString()); + log.info(sb.toString()); if (!containsKey) broadcast(new AddDataMessage(protectedData), sender); @@ -148,6 +161,7 @@ public class ProtectedExpirableDataStorage { } public boolean remove(ProtectedData protectedData, @Nullable Address sender) { + Log.traceCall(); BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload); boolean containsKey = map.containsKey(hashOfPayload); if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); @@ -172,6 +186,7 @@ public class ProtectedExpirableDataStorage { } public boolean removeMailboxData(ProtectedMailboxData protectedMailboxData, @Nullable Address sender) { + Log.traceCall(); BigInteger hashOfData = getHashAsBigInteger(protectedMailboxData.expirablePayload); boolean containsKey = map.containsKey(hashOfData); if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data."); @@ -196,11 +211,13 @@ public class ProtectedExpirableDataStorage { } public Map getMap() { + Log.traceCall(); return map; } public ProtectedData getDataWithSignedSeqNr(ExpirablePayload payload, KeyPair ownerStoragePubKey) throws CryptoException { + Log.traceCall(); BigInteger hashOfData = getHashAsBigInteger(payload); int sequenceNumber; if (sequenceNumberMap.containsKey(hashOfData)) @@ -216,6 +233,7 @@ public class ProtectedExpirableDataStorage { public ProtectedMailboxData getMailboxDataWithSignedSeqNr(ExpirableMailboxPayload expirableMailboxPayload, KeyPair storageSignaturePubKey, PublicKey receiversPublicKey) throws CryptoException { + Log.traceCall(); BigInteger hashOfData = getHashAsBigInteger(expirableMailboxPayload); int sequenceNumber; if (sequenceNumberMap.containsKey(hashOfData)) @@ -230,10 +248,12 @@ public class ProtectedExpirableDataStorage { } public void addHashMapChangedListener(HashMapChangedListener hashMapChangedListener) { + Log.traceCall(); hashMapChangedListeners.add(hashMapChangedListener); } private void addMessageListener(MessageListener messageListener) { + Log.traceCall(); peerGroup.addMessageListener(messageListener); } @@ -243,9 +263,10 @@ public class ProtectedExpirableDataStorage { /////////////////////////////////////////////////////////////////////////////////////////// private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) { + Log.traceCall(); map.remove(hashOfPayload); log.trace("Data removed from our map. We broadcast the message to our peers."); - UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData))); + hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)); StringBuilder sb = new StringBuilder("\n\n############################################################\n" + "Data set after removeProtectedExpirableData:"); @@ -255,6 +276,7 @@ public class ProtectedExpirableDataStorage { } private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) { + Log.traceCall(); int newSequenceNumber = data.sequenceNumber; Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData); if (sequenceNumberMap.containsKey(hashOfData) && newSequenceNumber <= storedSequenceNumber) { @@ -267,6 +289,7 @@ public class ProtectedExpirableDataStorage { } private boolean checkSignature(ProtectedData data) { + Log.traceCall(); byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNr(data.expirablePayload, data.sequenceNumber)); try { boolean result = Sig.verify(data.ownerStoragePubKey, hashOfDataAndSeqNr, data.signature); @@ -282,6 +305,7 @@ public class ProtectedExpirableDataStorage { } private boolean checkPublicKeys(ProtectedData data, boolean isAddOperation) { + Log.traceCall(); boolean result = false; if (data.expirablePayload instanceof ExpirableMailboxPayload) { ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) data.expirablePayload; @@ -299,6 +323,7 @@ public class ProtectedExpirableDataStorage { } private boolean checkIfStoredDataMatchesNewData(ProtectedData data, BigInteger hashOfData) { + Log.traceCall(); ProtectedData storedData = map.get(hashOfData); boolean result = getHashAsBigInteger(storedData.expirablePayload).equals(hashOfData) && storedData.ownerStoragePubKey.equals(data.ownerStoragePubKey); @@ -309,6 +334,7 @@ public class ProtectedExpirableDataStorage { } private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, BigInteger hashOfData) { + Log.traceCall(); ProtectedData storedData = map.get(hashOfData); if (storedData instanceof ProtectedMailboxData) { ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData; @@ -325,8 +351,8 @@ public class ProtectedExpirableDataStorage { } } - private void broadcast(BroadcastMessage message, @Nullable Address sender) { + Log.traceCall(); if (authenticated) { peerGroup.broadcast(message, sender); log.trace("Broadcast message " + message); @@ -336,6 +362,7 @@ public class ProtectedExpirableDataStorage { } private BigInteger getHashAsBigInteger(ExpirablePayload payload) { + Log.traceCall(); return new BigInteger(Hash.getHash(payload)); } } diff --git a/network/src/test/java/io/bitsquare/p2p/TestUtils.java b/network/src/test/java/io/bitsquare/p2p/TestUtils.java index 25b7adfd70..b32d4ae3f3 100644 --- a/network/src/test/java/io/bitsquare/p2p/TestUtils.java +++ b/network/src/test/java/io/bitsquare/p2p/TestUtils.java @@ -82,7 +82,7 @@ public class TestUtils { CountDownLatch latch = new CountDownLatch(1); seedNode.createAndStartP2PService(encryptionService, keyRing, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override @@ -120,7 +120,7 @@ public class TestUtils { P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy")); p2PService.start(new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { } @Override diff --git a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java index 34e015536a..bc8efbf5ce 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/PeerGroupTest.java @@ -85,7 +85,7 @@ public class PeerGroupTest { latch = new CountDownLatch(2); seedNode1.createAndStartP2PService(null, null, address, useLocalhost, seedNodes, new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { latch.countDown(); } @@ -129,7 +129,7 @@ public class PeerGroupTest { seedNode1 = new SeedNode(); seedNode1.createAndStartP2PService(null, null, address1, useLocalhost, seedNodes, new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { latch.countDown(); } @@ -160,7 +160,7 @@ public class PeerGroupTest { seedNode2 = new SeedNode(); seedNode2.createAndStartP2PService(null, null, address2, useLocalhost, seedNodes, new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { latch.countDown(); } @@ -387,7 +387,7 @@ public class PeerGroupTest { latch = new CountDownLatch(1); seedNode.createAndStartP2PService(null, null, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() { @Override - public void onAllDataReceived() { + public void onRequestingDataCompleted() { latch.countDown(); } diff --git a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 6a77678617..47ccff456d 100644 --- a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -1,7 +1,7 @@ package io.bitsquare.p2p.seed; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.bitsquare.app.Logging; +import io.bitsquare.app.Log; import io.bitsquare.common.UserThread; import org.bitcoinj.crypto.DRMWorkaround; import org.bouncycastle.jce.provider.BouncyCastleProvider; @@ -28,11 +28,12 @@ public class SeedNodeMain { // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { Path path = Paths.get("seed_node_log"); - Logging.setup(path.toString()); + Log.setup(path.toString()); + Log.PRINT_TRACE_METHOD = true; log.info("Log files under: " + path.toAbsolutePath().toString()); DRMWorkaround.maybeDisableExportControls(); - + new SeedNodeMain(args); } @@ -60,7 +61,7 @@ public class SeedNodeMain { public void listenForExitCommand() { Scanner scan = new Scanner(System.in); String line; - while (!stopped && ((line = scan.nextLine()) != null)) { + while (!stopped && !Thread.currentThread().isInterrupted() && ((line = scan.nextLine()) != null)) { if (line.equals("q")) { if (!stopped) { stopped = true; @@ -70,11 +71,11 @@ public class SeedNodeMain { }, 5); if (seedNode != null) { - seedNode.shutDown(() -> { + UserThread.execute(() -> seedNode.shutDown(() -> { timeout.cancel(); log.debug("Shutdown seed node complete."); System.exit(0); - }); + })); } } } diff --git a/seednode/src/main/resources/logback.xml b/seednode/src/main/resources/logback.xml index 94d5277b8e..2d66ba9abd 100644 --- a/seednode/src/main/resources/logback.xml +++ b/seednode/src/main/resources/logback.xml @@ -19,11 +19,11 @@ - %d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n + %highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{15} - %msg %xEx%n) - + +