From f788778f3ca552f66e79bb0f94485dc60d68b292 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Fri, 6 Nov 2015 03:27:45 +0100 Subject: [PATCH] fix disconnect handling --- jtorctl/README | 0 jtorctl/README.md | 1 + jtorproxy/README.md | 3 + .../java/io/bitsquare/p2p/P2PService.java | 2 +- .../io/bitsquare/p2p/network/Connection.java | 78 +++++++++++-------- .../p2p/network/ConnectionListener.java | 1 - .../p2p/network/LocalhostNetworkNode.java | 4 +- .../io/bitsquare/p2p/network/NetworkNode.java | 12 +-- .../bitsquare/p2p/network/TorNetworkNode.java | 1 + .../java/io/bitsquare/p2p/peer/PeerGroup.java | 5 +- .../java/io/bitsquare/p2p/seed/SeedNode.java | 2 +- .../p2p/seed/SeedNodesRepository.java | 4 +- .../io/bitsquare/p2p/seed/SeedNodeMain.java | 12 +-- 13 files changed, 75 insertions(+), 50 deletions(-) delete mode 100644 jtorctl/README create mode 100644 jtorctl/README.md diff --git a/jtorctl/README b/jtorctl/README deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/jtorctl/README.md b/jtorctl/README.md new file mode 100644 index 0000000000..abfcf834f1 --- /dev/null +++ b/jtorctl/README.md @@ -0,0 +1 @@ +Cloned from: https://github.com/JesusMcCloud/jtorctl \ No newline at end of file diff --git a/jtorproxy/README.md b/jtorproxy/README.md index 268c633dfd..08f9dfc66c 100644 --- a/jtorproxy/README.md +++ b/jtorproxy/README.md @@ -1,3 +1,6 @@ +Cloned from: https://github.com/ManfredKarrer/jtorproxy + + JTorProxy ========= JTorProxy aims at providing an easy-to-use API for interfacing with Tor from Java. JTorProxy also supports hidden services through standard Java ServerSockets. diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 66244e28f6..2021d759e0 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -118,7 +118,7 @@ public class P2PService { // peer group peerGroup = new PeerGroup(networkNode, seedNodeAddresses); - if (useLocalhost) PeerGroup.setSimulateAuthTorNode(2 * 1000); + if (useLocalhost) PeerGroup.setSimulateAuthTorNode(1 * 1000); // storage dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 08ce9f3c3e..ced575ec25 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -55,7 +55,6 @@ public class Connection { private Address peerAddress; private volatile boolean stopped; - private volatile boolean shutDownInProgress; //TODO got java.util.zip.DataFormatException: invalid distance too far back // java.util.zip.DataFormatException: invalid literal/lengths set @@ -136,7 +135,7 @@ public class Connection { sharedSpace.handleConnectionException(e); } } else { - UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.ALREADY_CLOSED, this)); + log.debug("sendMessage after stopped"); } } @@ -188,7 +187,7 @@ public class Connection { } private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { - if (!shutDownInProgress) { + if (!stopped) { log.info("\n\nShutDown connection:" + "\npeerAddress=" + peerAddress + "\nobjectId=" + getObjectId() @@ -199,35 +198,38 @@ public class Connection { log.debug("ShutDown " + this.getObjectId()); log.debug("ShutDown connection requested. Connection=" + this.toString()); - if (!stopped) { - stopped = true; + stopped = true; + sharedSpace.stop(); + if (inputHandler != null) inputHandler.stop(); - shutDownInProgress = true; - UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.SHUT_DOWN, this)); - - if (sendCloseConnectionMessage) { - new Thread(() -> { - Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.getObjectId()); - try { - sendMessage(new CloseConnectionMessage()); - // give a bit of time for closing gracefully - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - t.printStackTrace(); - log.error(t.getMessage()); - } finally { - UserThread.execute(() -> continueShutDown(shutDownCompleteHandler)); - } - }).start(); - } else { - continueShutDown(shutDownCompleteHandler); - } + if (sendCloseConnectionMessage) { + new Thread(() -> { + Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.getObjectId()); + try { + sendMessage(new CloseConnectionMessage()); + // give a bit of time for closing gracefully + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + t.printStackTrace(); + log.error(t.getMessage()); + } finally { + UserThread.execute(() -> continueShutDown(shutDownCompleteHandler)); + } + }).start(); + } else { + continueShutDown(shutDownCompleteHandler); } } } private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) { + ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason(); + if (shutDownReason == null) + shutDownReason = ConnectionListener.Reason.SHUT_DOWN; + final ConnectionListener.Reason finalShutDownReason = shutDownReason; + UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(finalShutDownReason, this)); + try { sharedSpace.getSocket().close(); } catch (SocketException e) { @@ -272,8 +274,9 @@ public class Connection { ", objectId='" + getObjectId() + '\'' + ", sharedSpace=" + sharedSpace.toString() + ", peerAddress=" + peerAddress + + ", isAuthenticated=" + isAuthenticated + + ", stopped=" + stopped + ", stopped=" + stopped + - ", shutDownInProgress=" + shutDownInProgress + ", useCompression=" + useCompression + '}'; } @@ -306,6 +309,8 @@ public class Connection { // mutable private Date lastActivityDate; + private volatile boolean stopped; + private ConnectionListener.Reason shutDownReason; public SharedSpace(Connection connection, Socket socket, MessageListener messageListener, ConnectionListener connectionListener, boolean useCompression) { @@ -338,20 +343,21 @@ public class Connection { public void handleConnectionException(Exception e) { if (e instanceof SocketException) { if (socket.isClosed()) - UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.SOCKET_CLOSED, connection)); + shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; else - UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.RESET, connection)); + shutDownReason = ConnectionListener.Reason.RESET; } else if (e instanceof SocketTimeoutException) { - UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.TIMEOUT, connection)); + shutDownReason = ConnectionListener.Reason.TIMEOUT; } else if (e instanceof EOFException) { - UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.PEER_DISCONNECTED, connection)); + shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED; } else { + shutDownReason = ConnectionListener.Reason.UNKNOWN; log.info("Exception at connection with port " + socket.getLocalPort()); e.printStackTrace(); - UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.UNKNOWN, connection)); } - connection.shutDown(false); + if (!stopped) + connection.shutDown(false); } public void onMessage(Message message) { @@ -387,6 +393,14 @@ public class Connection { ", lastActivityDate=" + lastActivityDate + '}'; } + + public void stop() { + this.stopped = stopped; + } + + public ConnectionListener.Reason getShutDownReason() { + return shutDownReason; + } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java index e0b7504dbe..9a89e6bca3 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/ConnectionListener.java @@ -11,7 +11,6 @@ public interface ConnectionListener { TIMEOUT, SHUT_DOWN, PEER_DISCONNECTED, - ALREADY_CLOSED, UNKNOWN } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index 536496793d..af36547228 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -25,8 +25,8 @@ import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class); - private static int simulateTorDelayTorNode = 2 * 100; - private static int simulateTorDelayHiddenService = 2 * 100; + private static int simulateTorDelayTorNode = 1 * 1000; + private static int simulateTorDelayHiddenService = 2 * 1000; private Address address; public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 998d167755..67dcf8320b 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -99,8 +99,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener newConnection.sendMessage(message); return newConnection; - } catch (Throwable t) { - throw t; + } catch (Throwable throwable) { + if (!(throwable instanceof ConnectException || throwable instanceof IOException)) { + throwable.printStackTrace(); + log.error("Executing task failed. " + throwable.getMessage()); + } + throw throwable; } }); Futures.addCallback(future, new FutureCallback() { @@ -109,10 +113,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } public void onFailure(@NotNull Throwable throwable) { - if (!(throwable instanceof ConnectException)) { - throwable.printStackTrace(); - log.error("Executing task failed. " + throwable.getMessage()); - } UserThread.execute(() -> resultFuture.setException(throwable)); } }); diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index 4b4a15f1f2..d8220555e7 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -199,6 +199,7 @@ public class TorNetworkNode extends NetworkNode { if (torDir.mkdirs()) log.trace("Created directory for tor"); + log.info("TorDir = " + torDir.getAbsolutePath()); log.trace("Create TorNode"); TorNode torNode1 = new TorNode( torDir) { diff --git a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java index 64a814715f..231ecd448e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peer/PeerGroup.java @@ -46,6 +46,8 @@ public class PeerGroup { private final ConcurrentHashMap authenticatedPeers = new ConcurrentHashMap<>(); private final CopyOnWriteArraySet
reportedPeerAddresses = new CopyOnWriteArraySet<>(); private final ConcurrentHashMap authenticationCompleteHandlers = new ConcurrentHashMap<>(); + ; + private final Timer maintenanceTimer = new Timer(); private volatile boolean shutDownInProgress; @@ -82,6 +84,8 @@ public class PeerGroup { @Override public void onDisconnect(Reason reason, Connection connection) { + log.debug("onDisconnect connection=" + connection + " / reason=" + reason); + log.debug("##### onDisconnect connection.isAuthenticated()=" + connection.isAuthenticated()); // only removes authenticated nodes if (connection.isAuthenticated()) removePeer(connection.getPeerAddress()); @@ -307,7 +311,6 @@ public class PeerGroup { RequestAuthenticationMessage requestAuthenticationMessage = (RequestAuthenticationMessage) message; Address peerAddress = requestAuthenticationMessage.address; log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + getAddress()); - connection.shutDown(() -> Utilities.runTimerTask(() -> { Thread.currentThread().setName("DelaySendChallengeMessageTimer-" + new Random().nextInt(1000)); // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index c1b647a1be..7cbbf6a0d3 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -18,7 +18,7 @@ public class SeedNode { private static final Logger log = LoggerFactory.getLogger(SeedNode.class); private int port = 8001; - private boolean useLocalhost = true; + private boolean useLocalhost = false; private Set
seedNodes; private P2PService p2PService; protected boolean stopped; diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNodesRepository.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNodesRepository.java index 9d2b017583..89decf08b7 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNodesRepository.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNodesRepository.java @@ -9,7 +9,9 @@ public class SeedNodesRepository { protected Set
torSeedNodeAddresses = Sets.newHashSet( - new Address("lmvdenjkyvx2ovga.onion:8001") + new Address("lmvdenjkyvx2ovga.onion:8001"), + new Address("eo5ay2lyzrfvx2nr.onion:8002"), + new Address("si3uu56adkyqkldl.onion:8003") ); diff --git a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 4686920397..4144e57645 100644 --- a/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/seednode/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -1,13 +1,15 @@ package io.bitsquare.p2p.seed; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.bitsquare.app.Logging; import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Utilities; -import org.bitcoinj.crypto.DRMWorkaround; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.NoSuchAlgorithmException; import java.security.Security; import java.util.Random; @@ -18,7 +20,6 @@ import java.util.concurrent.ThreadFactory; public class SeedNodeMain { private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class); - private static SeedNodeMain seedNodeMain; private SeedNode seedNode; private boolean stopped; @@ -27,9 +28,10 @@ public class SeedNodeMain { // eg. 4444 true localhost:7777 localhost:8888 // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { - - DRMWorkaround.maybeDisableExportControls(); - seedNodeMain = new SeedNodeMain(args); + Path path = Paths.get("seed_node_log"); + Logging.setup(path.toString()); + log.info("Log files under: " + path.toAbsolutePath().toString()); + new SeedNodeMain(args); } public SeedNodeMain(String[] args) {