add exception handling to runnables

This commit is contained in:
Manfred Karrer 2015-11-03 20:56:08 +01:00
parent 4d82467fa3
commit 0b7e45ca55
15 changed files with 463 additions and 385 deletions

View file

@ -3,6 +3,7 @@ package io.bitsquare.p2p;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.bitsquare.app.ProgramArguments;
@ -35,8 +36,7 @@ import java.io.File;
import java.math.BigInteger;
import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@ -47,15 +47,20 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
private final SeedNodesRepository seedNodesRepository;
private final int port;
private final File torDir;
private final boolean useLocalhost;
@Nullable
private final EncryptionService encryptionService;
private final SetupListener setupListener;
private SetupListener setupListener;
private KeyRing keyRing;
private final File storageDir;
private final NetworkStatistics networkStatistics;
private final NetworkNode networkNode;
private final Routing routing;
private final ProtectedExpirableDataStorage dataStorage;
private NetworkNode networkNode;
private Routing routing;
private ProtectedExpirableDataStorage dataStorage;
private final List<DecryptedMailListener> decryptedMailListeners = new CopyOnWriteArrayList<>();
private final List<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArrayList<>();
private final List<P2PServiceListener> p2pServiceListeners = new CopyOnWriteArrayList<>();
@ -73,7 +78,7 @@ public class P2PService {
private boolean allSeedNodesRequested;
private Timer sendGetAllDataMessageTimer;
private volatile boolean hiddenServiceReady;
private final ExecutorService executorService;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -87,11 +92,27 @@ public class P2PService {
@Nullable EncryptionService encryptionService,
KeyRing keyRing,
@Named("storage.dir") File storageDir) {
this.seedNodesRepository = seedNodesRepository;
this.port = port;
this.torDir = torDir;
this.useLocalhost = useLocalhost;
this.encryptionService = encryptionService;
this.keyRing = keyRing;
this.storageDir = storageDir;
networkStatistics = new NetworkStatistics();
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("P2PService-%d")
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
init();
}
private void init() {
// network layer
if (useLocalhost) {
networkNode = new LocalhostNetworkNode(port);
@ -578,7 +599,12 @@ public class P2PService {
sendGetAllDataMessageTimer.schedule(new TimerTask() {
@Override
public void run() {
sendGetAllDataMessage(remainingSeedNodeAddresses);
try {
sendGetAllDataMessage(remainingSeedNodeAddresses);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}, new Random().nextInt(2000) + 1000);
} else {

View file

@ -8,6 +8,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -37,11 +38,13 @@ public class Utils {
public static void shutDownExecutorService(ExecutorService executorService, long waitBeforeShutDown) {
executorService.shutdown();
try {
executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS);
boolean done = executorService.awaitTermination(waitBeforeShutDown, TimeUnit.MILLISECONDS);
if (!done) log.trace("Not all tasks completed at shutdown.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executorService.shutdownNow();
final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
}
public static byte[] compress(Serializable input) {

View file

@ -1,10 +1,13 @@
package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import io.bitsquare.common.ByteArrayUtils;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.network.messages.CloseConnectionMessage;
import javafx.concurrent.Task;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,9 +19,7 @@ import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
public class Connection {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
@ -37,18 +38,18 @@ public class Connection {
private final String uid;
private final Map<IllegalRequest, Integer> illegalRequests = new ConcurrentHashMap<>();
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final ExecutorService executorService;
private ObjectOutputStream out;
private ObjectInputStream in;
@Nullable
private Address peerAddress;
private boolean isAuthenticated;
private volatile boolean stopped;
private volatile boolean shutDownInProgress;
private volatile boolean inputHandlerStopped;
private volatile Date lastActivityDate;
@Nullable
private Address peerAddress;
private boolean isAuthenticated;
//TODO got java.util.zip.DataFormatException: invalid distance too far back
@ -69,6 +70,17 @@ public class Connection {
uid = UUID.randomUUID().toString();
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Connection-%d")
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
init();
}
private void init() {
try {
socket.setSoTimeout(SOCKET_TIMEOUT);
// Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block
@ -203,12 +215,8 @@ public class Connection {
if (sendCloseConnectionMessage) {
sendMessage(new CloseConnectionMessage());
try {
// give a bit of time for closing gracefully
Thread.sleep(100);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
// give a bit of time for closing gracefully
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
try {
@ -296,61 +304,79 @@ public class Connection {
private class InputHandler implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("InputHandler-" + socket.getLocalPort());
while (!inputHandlerStopped) {
try {
log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId());
Object rawInputObject = in.readObject();
log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString()
+ " rawInputObject " + rawInputObject);
try {
Thread.currentThread().setName("InputHandler-" + socket.getLocalPort());
while (!inputHandlerStopped) {
try {
log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId());
Object rawInputObject = in.readObject();
log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString()
+ " rawInputObject " + rawInputObject);
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size <= MAX_MSG_SIZE) {
Serializable serializable = null;
if (useCompression) {
if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
size = compressedObjectAsBytes.length;
//log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes);
} else {
reportIllegalRequest(IllegalRequest.InvalidDataType);
}
} else {
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
reportIllegalRequest(IllegalRequest.InvalidDataType);
}
}
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size <= MAX_MSG_SIZE) {
if (serializable instanceof Message) {
lastActivityDate = new Date();
Message message = (Message) serializable;
if (message instanceof CloseConnectionMessage) {
inputHandlerStopped = true;
shutDown(false);
Serializable serializable = null;
if (useCompression) {
if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
size = compressedObjectAsBytes.length;
//log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes);
} else {
executorService.submit(() -> messageListener.onMessage(message, Connection.this));
reportIllegalRequest(IllegalRequest.InvalidDataType);
}
} else {
reportIllegalRequest(IllegalRequest.InvalidDataType);
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
reportIllegalRequest(IllegalRequest.InvalidDataType);
}
}
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression
if (size <= MAX_MSG_SIZE) {
if (serializable instanceof Message) {
lastActivityDate = new Date();
Message message = (Message) serializable;
if (message instanceof CloseConnectionMessage) {
inputHandlerStopped = true;
shutDown(false);
} else {
Task task = new Task() {
@Override
protected Object call() throws Exception {
return null;
}
};
executorService.submit(() -> {
try {
messageListener.onMessage(message, Connection.this);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
});
}
} else {
reportIllegalRequest(IllegalRequest.InvalidDataType);
}
} else {
log.error("Received decompressed data exceeds max. msg size.");
reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
}
} else {
log.error("Received decompressed data exceeds max. msg size.");
log.error("Received compressed data exceeds max. msg size.");
reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
}
} else {
log.error("Received compressed data exceeds max. msg size.");
reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
} catch (IOException | ClassNotFoundException e) {
inputHandlerStopped = true;
handleConnectionException(e);
}
} catch (IOException | ClassNotFoundException e) {
inputHandlerStopped = true;
handleConnectionException(e);
}
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}

View file

@ -1,9 +1,6 @@
package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.*;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
import io.bitsquare.p2p.Address;
@ -19,6 +16,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class LocalhostNetworkNode extends NetworkNode {
@ -92,8 +90,9 @@ public class LocalhostNetworkNode extends NetworkNode {
private void createTorNode(final Consumer<TorNode> resultHandler) {
Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> {
long ts = System.currentTimeMillis();
log.trace("[simulation] Create TorNode");
if (simulateTorDelayTorNode > 0) Thread.sleep(simulateTorDelayTorNode);
if (simulateTorDelayTorNode > 0)
Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS);
log.info("\n\n############################################################\n" +
"TorNode created [simulation]:" +
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
@ -115,8 +114,9 @@ public class LocalhostNetworkNode extends NetworkNode {
private void createHiddenService(final Consumer<HiddenServiceDescriptor> resultHandler) {
Callable<HiddenServiceDescriptor> task = () -> {
long ts = System.currentTimeMillis();
log.debug("[simulation] Create hidden service");
if (simulateTorDelayHiddenService > 0) Thread.sleep(simulateTorDelayHiddenService);
if (simulateTorDelayHiddenService > 0)
Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS);
log.info("\n\n############################################################\n" +
"Hidden service created [simulation]:" +
"\nTook " + (System.currentTimeMillis() - ts) + " ms"

View file

@ -58,71 +58,77 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Callable<Connection> task = () -> {
Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress);
try {
Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress);
Optional<Connection> outboundConnectionOptional = getOutboundConnection(peerAddress);
Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null;
Optional<Connection> outboundConnectionOptional = getOutboundConnection(peerAddress);
Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null;
if (connection != null && connection.isStopped()) {
log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
outBoundConnections.remove(connection);
connection = null;
}
if (connection == null) {
Optional<Connection> inboundConnectionOptional = getInboundConnection(peerAddress);
if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get();
if (connection != null)
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
}
if (connection == null) {
try {
Socket socket = getSocket(peerAddress); // can take a while when using tor
connection = new Connection(socket,
(message1, connection1) -> NetworkNode.this.onMessage(message1, connection1),
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
NetworkNode.this.onError(throwable);
}
});
if (!outBoundConnections.contains(connection))
outBoundConnections.add(connection);
else
log.error("We have already that connection in our list. That must not happen. "
+ outBoundConnections + " / connection=" + connection);
log.info("\n\nNetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress.port
+ "\nconnection.uid=" + connection.getUid()
+ "\nmessage=" + message
+ "\n\n");
} catch (Throwable t) {
resultFuture.setException(t);
return null;
if (connection != null && connection.isStopped()) {
log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
outBoundConnections.remove(connection);
connection = null;
}
if (connection == null) {
Optional<Connection> inboundConnectionOptional = getInboundConnection(peerAddress);
if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get();
if (connection != null)
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
}
if (connection == null) {
try {
Socket socket = getSocket(peerAddress); // can take a while when using tor
connection = new Connection(socket,
(message1, connection1) -> NetworkNode.this.onMessage(message1, connection1),
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
NetworkNode.this.onError(throwable);
}
});
if (!outBoundConnections.contains(connection))
outBoundConnections.add(connection);
else
log.error("We have already that connection in our list. That must not happen. "
+ outBoundConnections + " / connection=" + connection);
log.info("\n\nNetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress.port
+ "\nconnection.uid=" + connection.getUid()
+ "\nmessage=" + message
+ "\n\n");
} catch (Throwable t) {
resultFuture.setException(t);
return null;
}
}
connection.sendMessage(message);
return connection;
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
throw t;
}
connection.sendMessage(message);
return connection;
};
ListenableFuture<Connection> future = executorService.submit(task);

View file

@ -1,6 +1,5 @@
package io.bitsquare.p2p.network;
import io.bitsquare.p2p.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -10,8 +9,6 @@ import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Server.class);
@ -19,7 +16,6 @@ public class Server implements Runnable {
private final ServerSocket serverSocket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final List<Connection> connections = new CopyOnWriteArrayList<>();
private volatile boolean stopped;
@ -32,25 +28,30 @@ public class Server implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
while (!stopped) {
try {
log.info("Ready to accept new clients on port " + serverSocket.getLocalPort());
final Socket socket = serverSocket.accept();
log.info("Accepted new client on port " + socket.getLocalPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
log.info("\n\nServer created new inbound connection:"
+ "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort()
+ "\nsocket.getPort()=" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid()
+ "\n\n");
try {
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
while (!stopped) {
try {
log.info("Ready to accept new clients on port " + serverSocket.getLocalPort());
final Socket socket = serverSocket.accept();
log.info("Accepted new client on port " + socket.getLocalPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
log.info("\n\nServer created new inbound connection:"
+ "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort()
+ "\nsocket.getPort()=" + socket.getPort()
+ "\nconnection.uid=" + connection.getUid()
+ "\n\n");
log.info("Server created new socket with port " + socket.getPort());
connections.add(connection);
} catch (IOException e) {
if (!stopped)
e.printStackTrace();
log.info("Server created new socket with port " + socket.getPort());
connections.add(connection);
} catch (IOException e) {
if (!stopped)
e.printStackTrace();
}
}
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
@ -67,7 +68,6 @@ public class Server implements Runnable {
} catch (IOException e) {
e.printStackTrace();
} finally {
Utils.shutDownExecutorService(executorService);
log.debug("Server shutdown complete");
}
}

View file

@ -21,6 +21,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@ -60,48 +61,62 @@ public class TorNetworkNode extends NetworkNode {
this.torDir = torDir;
init();
}
private void init() {
selfTestTimeoutTask = new TimerTask() {
@Override
public void run() {
log.error("A timeout occurred at self test");
stopSelfTestTimer();
selfTestFailed();
try {
log.error("A timeout occurred at self test");
stopSelfTestTimer();
selfTestFailed();
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
};
selfTestTask = new TimerTask() {
@Override
public void run() {
stopTimeoutTimer();
if (selfTestRunning.get()) {
log.debug("running self test");
selfTestTimeoutTimer = new Timer();
selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT);
// might be interrupted by timeout task
try {
stopTimeoutTimer();
if (selfTestRunning.get()) {
nonce = random.nextLong();
log.trace("send msg with nonce " + nonce);
log.debug("running self test");
selfTestTimeoutTimer = new Timer();
selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT);
// might be interrupted by timeout task
if (selfTestRunning.get()) {
nonce = random.nextLong();
log.trace("send msg with nonce " + nonce);
try {
SettableFuture<Connection> future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Sending self test message succeeded");
}
try {
SettableFuture<Connection> future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Sending self test message succeeded");
}
@Override
public void onFailure(Throwable throwable) {
log.error("Error at sending self test message. Exception = " + throwable);
stopTimeoutTimer();
throwable.printStackTrace();
selfTestFailed();
}
});
} catch (Exception e) {
e.printStackTrace();
@Override
public void onFailure(Throwable throwable) {
log.error("Error at sending self test message. Exception = " + throwable);
stopTimeoutTimer();
throwable.printStackTrace();
selfTestFailed();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
};
@ -144,11 +159,7 @@ public class TorNetworkNode extends NetworkNode {
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
startServer(hiddenServiceDescriptor.getServerSocket());
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
setupListeners.stream().forEach(e -> e.onHiddenServiceReady());
@ -252,12 +263,7 @@ public class TorNetworkNode extends NetworkNode {
restartCounter++;
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
shutDown(() -> {
try {
Thread.sleep(WAIT_BEFORE_RESTART);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
Uninterruptibles.sleepUninterruptibly(WAIT_BEFORE_RESTART, TimeUnit.MILLISECONDS);
log.warn("We restart tor as too many self tests failed.");
start(null);
});

View file

@ -1,8 +1,6 @@
package io.bitsquare.p2p.routing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.*;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Utils;
@ -15,10 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class Routing {
@ -47,7 +42,7 @@ public class Routing {
private final List<Address> reportedNeighborAddresses = new CopyOnWriteArrayList<>();
private final Map<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>();
private final Timer maintenanceTimer = new Timer();
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final ExecutorService executorService;
private volatile boolean shutDownInProgress;
@ -61,6 +56,17 @@ public class Routing {
// We copy it as we remove ourselves later from the list if we are a seed node
this.seedNodes = new CopyOnWriteArrayList<>(seeds);
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Routing-%d")
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
init(networkNode);
}
private void init(NetworkNode networkNode) {
networkNode.addMessageListener((message, connection) -> {
if (message instanceof AuthenticationMessage)
processAuthenticationMessage((AuthenticationMessage) message, connection);
@ -110,8 +116,13 @@ public class Routing {
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
disconnectOldConnections();
pingNeighbors();
try {
disconnectOldConnections();
pingNeighbors();
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
}, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL);
}
@ -126,11 +137,7 @@ public class Routing {
Connection connection = authenticatedConnections.remove(0);
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> disconnectOldConnections());
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
}
@ -153,11 +160,7 @@ public class Routing {
removeNeighbor(e.address);
}
});
try {
Thread.sleep(new Random().nextInt(5000) + 5000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS);
});
}
@ -247,16 +250,16 @@ public class Routing {
public void startAuthentication(List<Address> connectedSeedNodes) {
connectedSeedNodes.forEach(connectedSeedNode -> {
executorService.submit(() -> {
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
try {
sendRequestAuthenticationMessage(seedNodes, connectedSeedNode);
// give a random pause of 3-5 sec. before using the next
Thread.sleep(new Random().nextInt(2000) + 3000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(2000) + 3000, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
});
});
}
private void sendRequestAuthenticationMessage(final List<Address> remainingSeedNodes, final Address address) {
@ -315,17 +318,11 @@ public class Routing {
connection.shutDown(() -> {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
if (simulateAuthTorNode > 0)
Uninterruptibles.sleepUninterruptibly(simulateAuthTorNode, TimeUnit.MILLISECONDS);
try {
if (simulateAuthTorNode > 0) Thread.sleep(simulateAuthTorNode);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + getAddress());
long nonce = addToMapAndGetNonce(peerAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new ChallengeMessage(getAddress(), requestAuthenticationMessage.nonce, nonce));
@ -413,25 +410,25 @@ public class Routing {
ArrayList<Address> neighborAddresses = ((GetNeighborsMessage) message).neighborAddresses;
log.trace("Received neighbors: " + neighborAddresses);
// remove ourselves
neighborAddresses.remove(getAddress());
addToReportedNeighbors(neighborAddresses, connection);
}
} else if (message instanceof NeighborsMessage) {
log.trace("NeighborsMessage from " + connection.getPeerAddress() + " at " + getAddress());
NeighborsMessage neighborsMessage = (NeighborsMessage) message;
Address peerAddress = neighborsMessage.address;
log.trace("NeighborsMessage from " + peerAddress + " at " + getAddress());
ArrayList<Address> neighborAddresses = neighborsMessage.neighborAddresses;
log.trace("Received neighbors: " + neighborAddresses);
// remove ourselves
neighborAddresses.remove(getAddress());
addToReportedNeighbors(neighborAddresses, connection);
log.info("\n\nAuthenticationComplete\nPeer with address " + connection.getPeerAddress().toString()
+ " authenticated (" + connection.getObjectId() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
setAuthenticated(connection, neighborsMessage.address);
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
+ " authenticated (" + connection.getObjectId() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
setAuthenticated(connection, peerAddress);
Runnable authenticationCompleteHandler = authenticationCompleteHandlers.remove(connection.getPeerAddress());
if (authenticationCompleteHandler != null)
@ -442,18 +439,21 @@ public class Routing {
}
private void addToReportedNeighbors(ArrayList<Address> neighborAddresses, Connection connection) {
log.trace("addToReportedNeighbors");
// we disconnect misbehaving nodes trying to send too many neighbors
// reported neighbors include the peers connected neighbors which is normally max. 8 but we give some headroom
// for safety
if (neighborAddresses.size() > 1100) {
connection.shutDown();
} else {
neighborAddresses.remove(getAddress());
reportedNeighborAddresses.addAll(neighborAddresses);
purgeReportedNeighbors();
}
}
private void purgeReportedNeighbors() {
log.trace("purgeReportedNeighbors");
int all = getAllNeighborAddresses().size();
if (all > 1000) {
int diff = all - 100;
@ -478,20 +478,21 @@ public class Routing {
private void authenticateToNextRandomNeighbor() {
executorService.submit(() -> {
try {
Thread.sleep(new Random().nextInt(200) + 200);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) {
Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress();
if (randomNotConnectedNeighborAddress != null) {
log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress);
authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor());
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS);
if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) {
Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress();
if (randomNotConnectedNeighborAddress != null) {
log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress);
authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor());
} else {
log.info("No more neighbors available for connecting.");
}
} else {
log.info("No more neighbors available for connecting.");
log.info("We have already enough connections.");
}
} else {
log.info("We have already enough connections.");
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
});
}
@ -531,9 +532,8 @@ public class Routing {
private boolean verifyNonceAndAuthenticatePeerAddress(long peersNonce, Address peerAddress) {
log.trace("verifyNonceAndAuthenticatePeerAddress nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
long nonce = nonceMap.remove(peerAddress);
boolean result = nonce == peersNonce;
return result;
Long nonce = nonceMap.remove(peerAddress);
return nonce != null && nonce == peersNonce;
}
private void setAuthenticated(Connection connection, Address peerAddress) {

View file

@ -52,6 +52,10 @@ public class ProtectedExpirableDataStorage {
storage = new Storage<>(storageDir);
init();
}
private void init() {
ConcurrentHashMap<BigInteger, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
if (persisted != null) {
sequenceNumberMap = persisted;
@ -79,9 +83,14 @@ public class ProtectedExpirableDataStorage {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
log.info("removeExpiredEntries called ");
map.entrySet().stream().filter(entry -> entry.getValue().isExpired())
.forEach(entry -> map.remove(entry.getKey()));
try {
log.info("removeExpiredEntries called ");
map.entrySet().stream().filter(entry -> entry.getValue().isExpired())
.forEach(entry -> map.remove(entry.getKey()));
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
},
CHECK_TTL_INTERVAL,