Improve close connection and shutdown handling

This commit is contained in:
Manfred Karrer 2016-02-02 21:07:45 +01:00
parent 16f014adb6
commit 85b2cb1d44
19 changed files with 220 additions and 188 deletions

View file

@ -28,13 +28,14 @@ public class Version {
// The version nr. for the objects sent over the network. A change will break the serialization of old objects. // The version nr. for the objects sent over the network. A change will break the serialization of old objects.
// If objects are used for both network and database the network version is applied. // If objects are used for both network and database the network version is applied.
public static final long NETWORK_PROTOCOL_VERSION = 1; public static final long NETWORK_PROTOCOL_VERSION = 2;
// The version nr. of the serialized data stored to disc. A change will break the serialization of old objects. // The version nr. of the serialized data stored to disc. A change will break the serialization of old objects.
public static final long LOCAL_DB_VERSION = 1; public static final long LOCAL_DB_VERSION = 2;
// The version nr. of the current protocol. The offer holds that version. A taker will check the version of the offers to see if he his version is // The version nr. of the current protocol. The offer holds that version.
// compatible. // A taker will check the version of the offers to see if his version is compatible.
// TODO not used yet
public static final long PROTOCOL_VERSION = 1; public static final long PROTOCOL_VERSION = 1;
// The version for the bitcoin network (Mainnet = 0, TestNet = 1, Regtest = 2) // The version for the bitcoin network (Mainnet = 0, TestNet = 1, Regtest = 2)

View file

@ -73,7 +73,9 @@ public class Utilities {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeInSec, ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeInSec,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory); TimeUnit.SECONDS, new ArrayBlockingQueue<>(maximumPoolSize), threadFactory);
executor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler((r, e) -> log.warn("RejectedExecutionHandler called")); executor.setRejectedExecutionHandler((r, e) -> {
log.warn("RejectedExecutionHandler called");
});
return executor; return executor;
} }
@ -92,7 +94,9 @@ public class Utilities {
executor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
executor.setMaximumPoolSize(maximumPoolSize); executor.setMaximumPoolSize(maximumPoolSize);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRejectedExecutionHandler((r, e) -> log.debug("RejectedExecutionHandler called")); executor.setRejectedExecutionHandler((r, e) -> {
log.warn("RejectedExecutionHandler called");
});
return executor; return executor;
} }

View file

@ -63,6 +63,7 @@ public class FeePolicy {
// 0.001 BTC 0.1% of 1 BTC about 0.4 EUR @ 400 EUR/BTC // 0.001 BTC 0.1% of 1 BTC about 0.4 EUR @ 400 EUR/BTC
public static Coin getCreateOfferFee() { public static Coin getCreateOfferFee() {
// We cannot reduce it more for alpha testing as we need to pay the quite high miner fee of 30_000
return Coin.valueOf(100_000); return Coin.valueOf(100_000);
} }

View file

@ -148,7 +148,8 @@ public class MainViewModel implements ViewModel {
this.walletPasswordPopup = walletPasswordPopup; this.walletPasswordPopup = walletPasswordPopup;
this.formatter = formatter; this.formatter = formatter;
btcNetworkAsString = formatter.formatBitcoinNetwork(preferences.getBitcoinNetwork()); btcNetworkAsString = formatter.formatBitcoinNetwork(preferences.getBitcoinNetwork()) +
(preferences.getUseTorForBitcoinJ() ? " (using Tor)" : "");
TxIdTextField.setPreferences(preferences); TxIdTextField.setPreferences(preferences);
TxIdTextField.setWalletService(walletService); TxIdTextField.setWalletService(walletService);

View file

@ -27,6 +27,7 @@ import io.bitsquare.gui.util.validation.InputValidator;
import io.bitsquare.locale.BSResources; import io.bitsquare.locale.BSResources;
import io.bitsquare.locale.TradeCurrency; import io.bitsquare.locale.TradeCurrency;
import io.bitsquare.p2p.P2PService; import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.payment.PaymentAccount; import io.bitsquare.payment.PaymentAccount;
@ -398,7 +399,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
offerStateListener = (ov, oldValue, newValue) -> applyOfferState(newValue); offerStateListener = (ov, oldValue, newValue) -> applyOfferState(newValue);
connectionListener = new ConnectionListener() { connectionListener = new ConnectionListener() {
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent() && if (connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(offer.getOffererNodeAddress())) connection.getPeersNodeAddressOptional().get().equals(offer.getOffererNodeAddress()))
offerWarning.set("You lost connection to the offerer.\n" + offerWarning.set("You lost connection to the offerer.\n" +

View file

@ -298,7 +298,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall(); Log.traceCall();
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
// We removed the listener after a delay to be sure the connection has been removed // We removed the listener after a delay to be sure the connection has been removed

View file

@ -0,0 +1,32 @@
package io.bitsquare.p2p.network;
public enum CloseConnectionReason {
// First block are from different exceptions
SOCKET_CLOSED(false),
RESET(false),
SOCKET_TIMEOUT(false),
TERMINATED(false), // EOFException
INCOMPATIBLE_DATA(false),
UNKNOWN_EXCEPTION(false),
// Planned
APP_SHUT_DOWN(true),
CLOSE_REQUESTED_BY_PEER(false),
// send msg
SEND_MSG_FAILURE(false),
SEND_MSG_TIMEOUT(false),
// maintenance
TOO_MANY_CONNECTIONS_OPEN(true),
TOO_MANY_SEED_NODES_CONNECTED(true),
// illegal requests
RULE_VIOLATION(true);
public final boolean sendCloseMessage;
CloseConnectionReason(boolean sendCloseMessage) {
this.sendCloseMessage = sendCloseMessage;
}
}

View file

@ -193,8 +193,8 @@ public class Connection implements MessageListener {
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
public void reportIllegalRequest(CorruptRequest corruptRequest) { public void reportIllegalRequest(RuleViolation ruleViolation) {
sharedModel.reportInvalidRequest(corruptRequest); sharedModel.reportInvalidRequest(ruleViolation);
} }
public boolean violatesThrottleLimit() { public boolean violatesThrottleLimit() {
@ -297,19 +297,11 @@ public class Connection implements MessageListener {
// ShutDown // ShutDown
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown(Runnable completeHandler) { public void shutDown(CloseConnectionReason closeConnectionReason) {
shutDown(true, completeHandler); shutDown(closeConnectionReason, null);
} }
public void shutDown() { public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
shutDown(true, null);
}
public void shutDown(boolean sendCloseConnectionMessage) {
shutDown(sendCloseConnectionMessage, null);
}
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(this.toString()); Log.traceCall(this.toString());
if (!stopped) { if (!stopped) {
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null"; String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
@ -319,12 +311,15 @@ public class Connection implements MessageListener {
+ "\nuid=" + uid + "\nuid=" + uid
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
if (sendCloseConnectionMessage) { if (closeConnectionReason.sendCloseMessage) {
new Thread(() -> { new Thread(() -> {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid); Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.uid);
Log.traceCall("sendCloseConnectionMessage"); Log.traceCall("sendCloseConnectionMessage");
try { try {
sendMessage(new CloseConnectionMessage()); String reason = closeConnectionReason == CloseConnectionReason.RULE_VIOLATION ?
sharedModel.getRuleViolation().name() : closeConnectionReason.name();
sendMessage(new CloseConnectionMessage(reason));
setStopFlags(); setStopFlags();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@ -332,12 +327,12 @@ public class Connection implements MessageListener {
log.error(t.getMessage()); log.error(t.getMessage());
t.printStackTrace(); t.printStackTrace();
} finally { } finally {
UserThread.execute(() -> doShutDown(shutDownCompleteHandler)); UserThread.execute(() -> doShutDown(closeConnectionReason, shutDownCompleteHandler));
} }
}).start(); }).start();
} else { } else {
setStopFlags(); setStopFlags();
doShutDown(shutDownCompleteHandler); doShutDown(closeConnectionReason, shutDownCompleteHandler);
} }
} }
} }
@ -349,14 +344,9 @@ public class Connection implements MessageListener {
inputHandler.stop(); inputHandler.stop();
} }
private void doShutDown(@Nullable Runnable shutDownCompleteHandler) { private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
ConnectionListener.Reason shutDownReason = sharedModel.getShutDownReason(); // Use UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownReason == null) UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
shutDownReason = ConnectionListener.Reason.SHUT_DOWN;
final ConnectionListener.Reason finalShutDownReason = shutDownReason;
// keep UserThread.execute as its not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(finalShutDownReason, this));
try { try {
sharedModel.getSocket().close(); sharedModel.getSocket().close();
} catch (SocketException e) { } catch (SocketException e) {
@ -433,12 +423,13 @@ public class Connection implements MessageListener {
private final Connection connection; private final Connection connection;
private final Socket socket; private final Socket socket;
private final ConcurrentHashMap<CorruptRequest, Integer> corruptRequests = new ConcurrentHashMap<>(); private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();
// mutable // mutable
private Date lastActivityDate; private Date lastActivityDate;
private volatile boolean stopped; private volatile boolean stopped;
private ConnectionListener.Reason shutDownReason; private CloseConnectionReason closeConnectionReason;
private RuleViolation ruleViolation;
public SharedModel(Connection connection, Socket socket) { public SharedModel(Connection connection, Socket socket) {
this.connection = connection; this.connection = connection;
@ -453,26 +444,27 @@ public class Connection implements MessageListener {
return lastActivityDate; return lastActivityDate;
} }
public void reportInvalidRequest(CorruptRequest corruptRequest) { public void reportInvalidRequest(RuleViolation ruleViolation) {
log.warn("We got reported an corrupt request " + corruptRequest + "\n\tconnection=" + this); log.warn("We got reported an corrupt request " + ruleViolation + "\n\tconnection=" + this);
int numCorruptRequests; int numRuleViolations;
if (corruptRequests.contains(corruptRequest)) if (ruleViolations.contains(ruleViolation))
numCorruptRequests = corruptRequests.get(corruptRequest); numRuleViolations = ruleViolations.get(ruleViolation);
else else
numCorruptRequests = 0; numRuleViolations = 0;
numCorruptRequests++; numRuleViolations++;
corruptRequests.put(corruptRequest, numCorruptRequests); ruleViolations.put(ruleViolation, numRuleViolations);
if (numCorruptRequests >= corruptRequest.maxTolerance) { if (numRuleViolations >= ruleViolation.maxTolerance) {
log.warn("We close connection as we received too many corrupt requests.\n" + log.warn("We close connection as we received too many corrupt requests.\n" +
"numCorruptRequests={}\n\t" + "numRuleViolations={}\n\t" +
"corruptRequest={}\n\t" + "corruptRequest={}\n\t" +
"corruptRequests={}\n\t" + "corruptRequests={}\n\t" +
"connection={}", numCorruptRequests, corruptRequest, corruptRequests.toString(), connection); "connection={}", numRuleViolations, ruleViolation, ruleViolations.toString(), connection);
shutDown(); this.ruleViolation = ruleViolation;
shutDown(CloseConnectionReason.RULE_VIOLATION);
} else { } else {
corruptRequests.put(corruptRequest, ++numCorruptRequests); ruleViolations.put(ruleViolation, ++numRuleViolations);
} }
} }
@ -480,30 +472,30 @@ public class Connection implements MessageListener {
Log.traceCall(e.toString()); Log.traceCall(e.toString());
if (e instanceof SocketException) { if (e instanceof SocketException) {
if (socket.isClosed()) if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; closeConnectionReason = CloseConnectionReason.SOCKET_CLOSED;
else else
shutDownReason = ConnectionListener.Reason.RESET; closeConnectionReason = CloseConnectionReason.RESET;
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) { } else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
shutDownReason = ConnectionListener.Reason.TIMEOUT; closeConnectionReason = CloseConnectionReason.SOCKET_TIMEOUT;
log.debug("TimeoutException at socket " + socket.toString() + "\n\tconnection={}" + this); log.debug("TimeoutException at socket " + socket.toString() + "\n\tconnection={}" + this);
} else if (e instanceof EOFException) { } else if (e instanceof EOFException) {
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED; closeConnectionReason = CloseConnectionReason.TERMINATED;
} else if (e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException) { } else if (e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException) {
shutDownReason = ConnectionListener.Reason.INCOMPATIBLE_DATA; closeConnectionReason = CloseConnectionReason.INCOMPATIBLE_DATA;
} else { } else {
shutDownReason = ConnectionListener.Reason.UNKNOWN; closeConnectionReason = CloseConnectionReason.UNKNOWN_EXCEPTION;
log.warn("Unknown reason for exception at socket {}\n\tconnection={}\n\tException=", log.warn("Unknown reason for exception at socket {}\n\tconnection={}\n\tException=",
socket.toString(), this, e.getMessage()); socket.toString(), this, e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
shutDown(); shutDown(closeConnectionReason);
} }
public void shutDown() { public void shutDown(CloseConnectionReason closeConnectionReason) {
if (!stopped) { if (!stopped) {
stopped = true; stopped = true;
connection.shutDown(false); connection.shutDown(closeConnectionReason);
} }
} }
@ -515,15 +507,15 @@ public class Connection implements MessageListener {
this.stopped = true; this.stopped = true;
} }
public synchronized ConnectionListener.Reason getShutDownReason() { public RuleViolation getRuleViolation() {
return shutDownReason; return ruleViolation;
} }
@Override @Override
public String toString() { public String toString() {
return "SharedSpace{" + return "SharedSpace{" +
", socket=" + socket + ", socket=" + socket +
", illegalRequests=" + corruptRequests + ", ruleViolations=" + ruleViolations +
", lastActivityDate=" + lastActivityDate + ", lastActivityDate=" + lastActivityDate +
'}'; '}';
} }
@ -581,7 +573,7 @@ public class Connection implements MessageListener {
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size > getMaxMsgSize()) { if (size > getMaxMsgSize()) {
sharedModel.reportInvalidRequest(CorruptRequest.MaxSizeExceeded); sharedModel.reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
return; return;
} }
@ -593,45 +585,47 @@ public class Connection implements MessageListener {
//log.trace("Read object compressed data size: " + size); //log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes); serializable = Utils.decompress(compressedObjectAsBytes);
} else { } else {
sharedModel.reportInvalidRequest(CorruptRequest.InvalidDataType); sharedModel.reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
} }
} else { } else {
if (rawInputObject instanceof Serializable) { if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject; serializable = (Serializable) rawInputObject;
} else { } else {
sharedModel.reportInvalidRequest(CorruptRequest.InvalidDataType); sharedModel.reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
} }
} }
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression // compressed size might be bigger theoretically so we check again after decompression
if (size > getMaxMsgSize()) { if (size > getMaxMsgSize()) {
sharedModel.reportInvalidRequest(CorruptRequest.MaxSizeExceeded); sharedModel.reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED);
return; return;
} }
if (sharedModel.connection.violatesThrottleLimit()) { if (sharedModel.connection.violatesThrottleLimit()) {
sharedModel.reportInvalidRequest(CorruptRequest.ViolatedThrottleLimit); sharedModel.reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED);
return; return;
} }
if (!(serializable instanceof Message)) { if (!(serializable instanceof Message)) {
sharedModel.reportInvalidRequest(CorruptRequest.InvalidDataType); sharedModel.reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return; return;
} }
Message message = (Message) serializable; Message message = (Message) serializable;
if (message.networkId() != Version.getNetworkId()) { if (message.networkId() != Version.getNetworkId()) {
sharedModel.reportInvalidRequest(CorruptRequest.WrongNetworkId); sharedModel.reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
return; return;
} }
Connection connection = sharedModel.connection; Connection connection = sharedModel.connection;
sharedModel.updateLastActivityDate(); sharedModel.updateLastActivityDate();
if (message instanceof CloseConnectionMessage) { if (message instanceof CloseConnectionMessage) {
log.info("CloseConnectionMessage received on connection {}", connection); CloseConnectionReason[] values = CloseConnectionReason.values();
log.info("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", ((CloseConnectionMessage) message).reason, connection);
stopped = true; stopped = true;
sharedModel.shutDown(); sharedModel.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
} else if (!stopped) { } else if (!stopped) {
// First a seed node gets a message form a peer (PreliminaryDataRequest using // First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service // AnonymousMessage interface) which does not has its hidden service

View file

@ -2,19 +2,9 @@ package io.bitsquare.p2p.network;
public interface ConnectionListener { public interface ConnectionListener {
enum Reason {
SOCKET_CLOSED,
RESET,
TIMEOUT,
SHUT_DOWN,
PEER_DISCONNECTED,
INCOMPATIBLE_DATA,
UNKNOWN
}
void onConnection(Connection connection); void onConnection(Connection connection);
void onDisconnect(Reason reason, Connection connection); void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection);
void onError(Throwable throwable); void onError(Throwable throwable);
} }

View file

@ -1,14 +0,0 @@
package io.bitsquare.p2p.network;
public enum CorruptRequest {
MaxSizeExceeded(1),
InvalidDataType(0),
WrongNetworkId(0),
ViolatedThrottleLimit(1);
public final int maxTolerance;
CorruptRequest(int maxTolerance) {
this.maxTolerance = maxTolerance;
}
}

View file

@ -180,11 +180,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
server = null; server = null;
} }
getAllConnections().stream().forEach(Connection::shutDown); getAllConnections().stream().forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
log.info("NetworkNode shutdown complete"); log.info("NetworkNode shutdown complete");
if (shutDownCompleteHandler != null) shutDownCompleteHandler.run();
} }
if (shutDownCompleteHandler != null) shutDownCompleteHandler.run();
} }
@ -209,10 +208,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection));
} }
@Override @Override
@ -280,9 +279,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection); NetworkNode.this.onDisconnect(closeConnectionReason, connection);
} }
@Override @Override

View file

@ -0,0 +1,15 @@
package io.bitsquare.p2p.network;
public enum RuleViolation {
INVALID_DATA_TYPE(0),
WRONG_NETWORK_ID(0),
MAX_MSG_SIZE_EXCEEDED(1),
THROTTLE_LIMIT_EXCEEDED(1),
TOO_MANY_REPORTED_PEERS_SENT(1);
public final int maxTolerance;
RuleViolation(int maxTolerance) {
this.maxTolerance = maxTolerance;
}
}

View file

@ -71,7 +71,7 @@ class Server implements Runnable {
if (!stopped) { if (!stopped) {
stopped = true; stopped = true;
connections.stream().forEach(Connection::shutDown); connections.stream().forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
try { try {
serverSocket.close(); serverSocket.close();

View file

@ -14,6 +14,10 @@ import io.bitsquare.p2p.Utils;
import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.JavaTorNode; import io.nucleo.net.JavaTorNode;
import io.nucleo.net.TorNode; import io.nucleo.net.TorNode;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -34,15 +38,14 @@ public class TorNetworkNode extends NetworkNode {
private static final int MAX_RESTART_ATTEMPTS = 3; private static final int MAX_RESTART_ATTEMPTS = 3;
private static final int WAIT_BEFORE_RESTART = 2000; private static final int WAIT_BEFORE_RESTART = 2000;
private static final long SHUT_DOWN_TIMEOUT = 5000; private static final long SHUT_DOWN_TIMEOUT_SEC = 5;
private final File torDir; private final File torDir;
private TorNode torNetworkNode; private TorNode torNetworkNode;
private HiddenServiceDescriptor hiddenServiceDescriptor; private HiddenServiceDescriptor hiddenServiceDescriptor;
private Timer shutDownTimeoutTimer; private Timer shutDownTimeoutTimer;
private int restartCounter; private int restartCounter;
private Runnable shutDownCompleteHandler; private MonadicBinding<Boolean> allShutDown;
private boolean torShutDownComplete, networkNodeShutDownDoneComplete;
// ///////////////////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////////////////
@ -103,77 +106,74 @@ public class TorNetworkNode extends NetworkNode {
return torNetworkNode.connectToHiddenService(peerNodeAddress.hostName, peerNodeAddress.port); return torNetworkNode.connectToHiddenService(peerNodeAddress.hostName, peerNodeAddress.port);
} }
//TODO simplify
public void shutDown(Runnable shutDownCompleteHandler) { public void shutDown(Runnable shutDownCompleteHandler) {
Log.traceCall(); Log.traceCall();
this.shutDownCompleteHandler = shutDownCompleteHandler; BooleanProperty torNetworkNodeShutDown = torNetworkNodeShutDown();
BooleanProperty networkNodeShutDown = networkNodeShutDown();
BooleanProperty shutDownTimerTriggered = shutDownTimerTriggered();
shutDownTimeoutTimer = UserThread.runAfter(() -> { // Need to store allShutDown to not get garbage collected
log.error("A timeout occurred at shutDown"); allShutDown = EasyBind.combine(torNetworkNodeShutDown, networkNodeShutDown, shutDownTimerTriggered, (a, b, c) -> (a && b) || c);
shutDownExecutorService(); allShutDown.subscribe((observable, oldValue, newValue) -> {
}, SHUT_DOWN_TIMEOUT, TimeUnit.MILLISECONDS); if (newValue) {
shutDownTimeoutTimer.cancel();
if (executorService != null) {
executorService.submit(() -> UserThread.execute(() -> {
// We want to stay in UserThread
super.shutDown(() -> {
networkNodeShutDownDoneComplete = true;
if (torShutDownComplete)
shutDownExecutorService();
});
}));
} else {
log.error("executorService must not be null at shutDown");
}
executorService.submit(() -> {
Utilities.setThreadName("NetworkNode:torNodeShutdown");
try {
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
log.info("Shutdown torNode"); log.debug("Shutdown executorService");
// Might take a bit so we use a thread try {
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.debug("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
log.info("Shutdown completed");
} catch (Throwable t) {
log.error("Shutdown executorService failed with exception: " + t.getMessage());
t.printStackTrace();
} finally {
shutDownCompleteHandler.run();
}
}
});
}
private BooleanProperty torNetworkNodeShutDown() {
final BooleanProperty done = new SimpleBooleanProperty();
executorService.submit(() -> {
Utilities.setThreadName("torNetworkNodeShutDown");
long ts = System.currentTimeMillis();
log.info("Shutdown torNetworkNode");
try {
if (torNetworkNode != null) if (torNetworkNode != null)
torNetworkNode.shutdown(); torNetworkNode.shutdown();
log.info("Shutdown torNode done after " + (System.currentTimeMillis() - ts) + " ms."); log.info("Shutdown torNetworkNode done after " + (System.currentTimeMillis() - ts) + " ms.");
UserThread.execute(() -> {
torShutDownComplete = true;
if (networkNodeShutDownDoneComplete)
shutDownExecutorService();
});
} catch (Throwable e) { } catch (Throwable e) {
UserThread.execute(() -> { log.error("Shutdown torNetworkNode failed with exception: " + e.getMessage());
log.error("Shutdown torNode failed with exception: " + e.getMessage());
e.printStackTrace(); e.printStackTrace();
// We want to switch to UserThread } finally {
shutDownExecutorService(); UserThread.execute(() -> done.set(true));
});
} }
}); });
return done;
} }
private BooleanProperty networkNodeShutDown() {
final BooleanProperty done = new SimpleBooleanProperty();
super.shutDown(() -> done.set(true));
return done;
}
private BooleanProperty shutDownTimerTriggered() {
final BooleanProperty done = new SimpleBooleanProperty();
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
done.set(true);
}, SHUT_DOWN_TIMEOUT_SEC);
return done;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// shutdown, restart // shutdown, restart
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void shutDownExecutorService() { private void restartTor(String errorMessage) {
shutDownTimeoutTimer.cancel();
new Thread(() -> {
Utilities.setThreadName("NetworkNode:shutDownExecutorService");
try {
long ts = System.currentTimeMillis();
log.debug("Shutdown executorService");
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.debug("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
log.info("Shutdown completed");
shutDownCompleteHandler.run();
} catch (Throwable t) {
log.error("Shutdown executorService failed with exception: " + t.getMessage());
t.printStackTrace();
shutDownCompleteHandler.run();
}
}).start();
}
private void restartTor() {
Log.traceCall(); Log.traceCall();
restartCounter++; restartCounter++;
if (restartCounter <= MAX_RESTART_ATTEMPTS) { if (restartCounter <= MAX_RESTART_ATTEMPTS) {
@ -182,8 +182,10 @@ public class TorNetworkNode extends NetworkNode {
start(null); start(null);
}, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS)); }, WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS));
} else { } else {
String msg = "We tried to restart Tor " + restartCounter String msg = "We tried to restart Tor " + restartCounter +
+ " times, but it failed to start up. We give up now."; " times, but it continued to fail with error message:\n" +
errorMessage + "\n\n" +
"Please check your internet connection and firewall and try to start again.";
log.error(msg); log.error(msg);
throw new RuntimeException(msg); throw new RuntimeException(msg);
} }
@ -215,7 +217,7 @@ public class TorNetworkNode extends NetworkNode {
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> { UserThread.execute(() -> {
log.error("TorNode creation failed with exception: " + throwable.getMessage()); log.error("TorNode creation failed with exception: " + throwable.getMessage());
restartTor(); restartTor(throwable.getMessage());
}); });
} }
}); });
@ -249,7 +251,7 @@ public class TorNetworkNode extends NetworkNode {
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> { UserThread.execute(() -> {
log.error("Hidden service creation failed"); log.error("Hidden service creation failed");
restartTor(); restartTor(throwable.getMessage());
}); });
} }
}); });

View file

@ -8,8 +8,10 @@ public final class CloseConnectionMessage implements Message {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId(); private final int networkId = Version.getNetworkId();
public final String reason;
public CloseConnectionMessage() { public CloseConnectionMessage(String reason) {
this.reason = reason;
} }
@Override @Override
@ -20,6 +22,7 @@ public final class CloseConnectionMessage implements Message {
@Override @Override
public String toString() { public String toString() {
return "CloseConnectionMessage{" + return "CloseConnectionMessage{" +
", reason=" + reason +
", networkId=" + networkId + ", networkId=" + networkId +
'}'; '}';
} }

View file

@ -7,6 +7,7 @@ import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
@ -91,7 +92,7 @@ public class PeerExchangeHandshake implements MessageListener {
".\n\tException=" + throwable.getMessage(); ".\n\tException=" + throwable.getMessage();
log.info(errorMessage); log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
} }
@ -104,7 +105,7 @@ public class PeerExchangeHandshake implements MessageListener {
PeerExchangeHandshake.this); PeerExchangeHandshake.this);
log.info("timeoutTimer called on " + this); log.info("timeoutTimer called on " + this);
peerManager.shutDownConnection(nodeAddress); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
}, },
@ -142,7 +143,7 @@ public class PeerExchangeHandshake implements MessageListener {
"Exception: " + throwable.getMessage(); "Exception: " + throwable.getMessage();
log.info(errorMessage); log.info(errorMessage);
peerManager.shutDownConnection(connection); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
} }
@ -155,7 +156,7 @@ public class PeerExchangeHandshake implements MessageListener {
PeerExchangeHandshake.this); PeerExchangeHandshake.this);
log.info("timeoutTimer called. this=" + this); log.info("timeoutTimer called. this=" + this);
peerManager.shutDownConnection(connection); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
}, },

View file

@ -83,7 +83,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
// We use a timer to throttle if we get a series of disconnects // We use a timer to throttle if we get a series of disconnects
// The more connections we have the more relaxed we are with a checkConnections // The more connections we have the more relaxed we are with a checkConnections
stopMaintainConnectionsTimer(); stopMaintainConnectionsTimer();

View file

@ -112,7 +112,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> { connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
penalizeUnreachablePeer(nodeAddress); penalizeUnreachablePeer(nodeAddress);
@ -210,7 +210,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
log.info("Candidates.size() for shut down=" + candidates.size()); log.info("Candidates.size() for shut down=" + candidates.size());
Connection connection = candidates.remove(0); Connection connection = candidates.remove(0);
log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString()); log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString());
connection.shutDown(() -> checkMaxConnections(limit)); connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> checkMaxConnections(limit));
return true; return true;
} else { } else {
log.warn("No candidates found to remove (That case should not be possible as we use in the " + log.warn("No candidates found to remove (That case should not be possible as we use in the " +
@ -252,7 +252,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size()); log.info("Number of connections exceeding MAX_CONNECTIONS_EXTENDED_1. Current size=" + candidates.size());
Connection connection = candidates.remove(0); Connection connection = candidates.remove(0);
log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString()); log.info("We are going to shut down the oldest connection.\n\tconnection=" + connection.toString());
connection.shutDown(this::removeSuperfluousSeedNodes); connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED, this::removeSuperfluousSeedNodes);
} }
} }
} }
@ -276,7 +276,8 @@ public class PeerManager implements ConnectionListener, MessageListener {
// reported peers include the connected peers which is normally max. 10 but we give some headroom // reported peers include the connected peers which is normally max. 10 but we give some headroom
// for safety // for safety
if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + PeerManager.MIN_CONNECTIONS * 3)) { if (reportedPeersToAdd.size() > (MAX_REPORTED_PEERS + PeerManager.MIN_CONNECTIONS * 3)) {
connection.shutDown(); // Will trigger a shutdown after 2nd time sending too much
connection.reportIllegalRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
} else { } else {
// In case we have one of the peers already we adjust the lastActivityDate by adjusting the date to the mid // In case we have one of the peers already we adjust the lastActivityDate by adjusting the date to the mid
// of the lastActivityDate of our already stored peer and the reported one // of the lastActivityDate of our already stored peer and the reported one
@ -434,18 +435,18 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress); return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
} }
public void shutDownConnection(Connection connection) { public void shutDownConnection(Connection connection, CloseConnectionReason closeConnectionReason) {
if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
connection.shutDown(); connection.shutDown(closeConnectionReason);
} }
public void shutDownConnection(NodeAddress peersNodeAddress) { public void shutDownConnection(NodeAddress peersNodeAddress, CloseConnectionReason closeConnectionReason) {
networkNode.getAllConnections().stream() networkNode.getAllConnections().stream()
.filter(connection -> connection.getPeersNodeAddressOptional().isPresent() && .filter(connection -> connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) && connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) &&
connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER) connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.findFirst() .findAny()
.ifPresent(connection -> connection.shutDown(true)); .ifPresent(connection -> connection.shutDown(closeConnectionReason));
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -7,6 +7,7 @@ import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
@ -102,7 +103,7 @@ public class RequestDataHandshake implements MessageListener {
"getDataRequest=" + getDataRequest + "." + "getDataRequest=" + getDataRequest + "." +
"\n\tException=" + throwable.getMessage(); "\n\tException=" + throwable.getMessage();
log.info(errorMessage); log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
} }
@ -114,7 +115,7 @@ public class RequestDataHandshake implements MessageListener {
" on nodeAddress:" + nodeAddress; " on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandshake=" + log.info(errorMessage + " / RequestDataHandshake=" +
RequestDataHandshake.this); RequestDataHandshake.this);
peerManager.shutDownConnection(nodeAddress); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
}, },
@ -143,7 +144,7 @@ public class RequestDataHandshake implements MessageListener {
"Exception: " + throwable.getMessage(); "Exception: " + throwable.getMessage();
log.info(errorMessage); log.info(errorMessage);
peerManager.shutDownConnection(connection); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
} }
@ -155,7 +156,7 @@ public class RequestDataHandshake implements MessageListener {
" on connection:" + connection; " on connection:" + connection;
log.info(errorMessage + " / RequestDataHandshake=" + log.info(errorMessage + " / RequestDataHandshake=" +
RequestDataHandshake.this); RequestDataHandshake.this);
peerManager.shutDownConnection(connection); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage);
}, },