diff --git a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java index 68feef2666..5644101fa8 100644 --- a/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java +++ b/bootstrap/src/main/java/io/bitsquare/p2p/seed/SeedNodeMain.java @@ -1,9 +1,13 @@ package io.bitsquare.p2p.seed; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.bitsquare.common.UserThread; import org.bouncycastle.jce.provider.BouncyCastleProvider; import java.security.NoSuchAlgorithmException; import java.security.Security; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; public class SeedNodeMain { @@ -12,7 +16,13 @@ public class SeedNodeMain { // To stop enter: q public static void main(String[] args) throws NoSuchAlgorithmException { Security.addProvider(new BouncyCastleProvider()); - + + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("SeedNodeMain") + .setDaemon(true) + .build(); + UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory)); + SeedNode seedNode = new SeedNode(); seedNode.processArgs(args); seedNode.createAndStartP2PService(); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index 93a152bcd8..8c62a39d10 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -20,6 +20,7 @@ package io.bitsquare.trade.offer; import com.google.inject.Inject; import io.bitsquare.btc.TradeWalletService; import io.bitsquare.btc.WalletService; +import io.bitsquare.common.UserThread; import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ResultHandler; @@ -167,8 +168,8 @@ public class OpenOfferManager { TimerTask timerTask = new TimerTask() { @Override public void run() { - Thread.currentThread().setName("RepublishOffers-" + String.valueOf(new Random().nextInt(1000))); - rePublishOffers(); + Thread.currentThread().setName("RepublishOffers-" + new Random().nextInt(1000)); + UserThread.execute(() -> rePublishOffers()); try { } catch (Throwable t) { t.printStackTrace(); diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index f78308fd59..829facc2b6 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -16,7 +16,7 @@ import io.bitsquare.crypto.EncryptionService; import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.network.*; -import io.bitsquare.p2p.routing.Neighbor; +import io.bitsquare.p2p.routing.Peer; import io.bitsquare.p2p.routing.Routing; import io.bitsquare.p2p.routing.RoutingListener; import io.bitsquare.p2p.seed.SeedNodesRepository; @@ -67,7 +67,7 @@ public class P2PService { private final Map mailboxMap = new ConcurrentHashMap<>(); private volatile boolean shutDownInProgress; private List
seedNodeAddresses; - private List
connectedSeedNodes = new CopyOnWriteArrayList<>(); + private Set
connectedSeedNodes = new HashSet<>(); private Set
authenticatedPeerAddresses = new HashSet<>(); private boolean authenticatedToFirstPeer; private boolean allDataReceived; @@ -245,17 +245,17 @@ public class P2PService { routing.addRoutingListener(new RoutingListener() { @Override - public void onFirstNeighborAdded(Neighbor neighbor) { - log.trace("onFirstNeighbor " + neighbor.toString()); + public void onFirstPeerAdded(Peer peer) { + log.trace("onFirstPeer " + peer.toString()); } @Override - public void onNeighborAdded(Neighbor neighbor) { + public void onPeerAdded(Peer peer) { } @Override - public void onNeighborRemoved(Address address) { + public void onPeerRemoved(Address address) { } @@ -599,8 +599,9 @@ public class P2PService { sendGetAllDataMessageTimer.schedule(new TimerTask() { @Override public void run() { + Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000)); try { - sendGetAllDataMessage(remainingSeedNodeAddresses); + UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses)); } catch (Throwable t) { t.printStackTrace(); log.error("Executing task failed. " + t.getMessage()); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 792eadbf01..ae20eda03c 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import io.bitsquare.common.ByteArrayUtils; +import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Utils; @@ -21,41 +22,42 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.*; +/** + * Connection is created by the server thread or by send message from NetworkNode. + * All handlers are called on User thread. + * Shared data between InputHandler thread and that + */ public class Connection { private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data private static final int MAX_ILLEGAL_REQUESTS = 5; private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min. + private InputHandler inputHandler; public static int getMaxMsgSize() { return MAX_MSG_SIZE; } - private final Socket socket; private final int port; - private final MessageListener messageListener; - private final ConnectionListener connectionListener; private final String uid; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - private final Map illegalRequests = new ConcurrentHashMap<>(); - private final ExecutorService executorService; - private ObjectOutputStream out; - private ObjectInputStream in; + // set in init + private ObjectOutputStream objectOutputStream; + // holder of state shared between InputHandler and Connection + private SharedSpace sharedSpace; + // mutable data, set from other threads but not changed internally. @Nullable private Address peerAddress; - private boolean isAuthenticated; private volatile boolean stopped; private volatile boolean shutDownInProgress; - private volatile boolean inputHandlerStopped; - private volatile Date lastActivityDate; - //TODO got java.util.zip.DataFormatException: invalid distance too far back // java.util.zip.DataFormatException: invalid literal/lengths set // use GZIPInputStream but problems with blocking - private boolean useCompression = false; + boolean useCompression = false; /////////////////////////////////////////////////////////////////////////////////////////// @@ -63,24 +65,14 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { - this.socket = socket; port = socket.getLocalPort(); - this.messageListener = messageListener; - this.connectionListener = connectionListener; - uid = UUID.randomUUID().toString(); - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("Connection-" + socket.getLocalPort()) - .setDaemon(true) - .build(); - - executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); - - init(); + init(socket, messageListener, connectionListener); } - private void init() { + private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression); try { socket.setSoTimeout(SOCKET_TIMEOUT); // Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block @@ -88,35 +80,34 @@ public class Connection { // When you construct an ObjectInputStream, in the constructor the class attempts to read a header that // the associated ObjectOutputStream on the other end of the connection has written. // It will not return until that header has been read. - if (useCompression) { - out = new ObjectOutputStream(socket.getOutputStream()); - in = new ObjectInputStream(socket.getInputStream()); - } else { - out = new ObjectOutputStream(socket.getOutputStream()); - in = new ObjectInputStream(socket.getInputStream()); - } - executorService.submit(new InputHandler()); + objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); + + // We create a thread for handling inputStream data + inputHandler = new InputHandler(sharedSpace, objectInputStream, port); + executorService.submit(inputHandler); } catch (IOException e) { - handleConnectionException(e); + sharedSpace.handleConnectionException(e); } - lastActivityDate = new Date(); + sharedSpace.updateLastActivityDate(); log.trace("\nNew connection created " + this.toString()); - connectionListener.onConnection(this); + UserThread.execute(() -> connectionListener.onConnection(this)); } - public void onAuthenticationComplete(Address peerAddress, Connection connection) { - isAuthenticated = true; + + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// + + public synchronized void setAuthenticated(Address peerAddress, Connection connection) { this.peerAddress = peerAddress; - connectionListener.onPeerAddressAuthenticated(peerAddress, connection); - } - - public boolean isStopped() { - return stopped; + UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection)); } public void sendMessage(Message message) { + // That method we get called form user thread if (!stopped) { try { log.trace("writeObject " + message + " on connection with port " + port); @@ -132,28 +123,22 @@ public class Connection { // log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length); objectToWrite = message; } - out.writeObject(objectToWrite); - out.flush(); + objectOutputStream.writeObject(objectToWrite); + objectOutputStream.flush(); - lastActivityDate = new Date(); + sharedSpace.updateLastActivityDate(); } } catch (IOException e) { - handleConnectionException(e); + // an exception lead to a shutdown + sharedSpace.handleConnectionException(e); } } else { - connectionListener.onDisconnect(ConnectionListener.Reason.ALREADY_CLOSED, Connection.this); + UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.ALREADY_CLOSED, this)); } } public void reportIllegalRequest(IllegalRequest illegalRequest) { - log.warn("We got reported an illegal request " + illegalRequest); - int prevCounter = illegalRequests.get(illegalRequest); - if (prevCounter > illegalRequest.limit) { - log.warn("We close connection as we received too many illegal requests.\n" + illegalRequests.toString()); - shutDown(); - } else { - illegalRequests.put(illegalRequest, ++prevCounter); - } + sharedSpace.reportIllegalRequest(illegalRequest); } @@ -162,22 +147,26 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// @Nullable - public Address getPeerAddress() { + public synchronized Address getPeerAddress() { return peerAddress; } public Date getLastActivityDate() { - return lastActivityDate; + return sharedSpace.getLastActivityDate(); } - public boolean isAuthenticated() { - return isAuthenticated; + public synchronized boolean isAuthenticated() { + return peerAddress != null; } public String getUid() { return uid; } + public boolean isStopped() { + return stopped; + } + /////////////////////////////////////////////////////////////////////////////////////////// // ShutDown @@ -191,7 +180,7 @@ public class Connection { shutDown(true, null); } - private void shutDown(boolean sendCloseConnectionMessage) { + void shutDown(boolean sendCloseConnectionMessage) { shutDown(sendCloseConnectionMessage, null); } @@ -201,17 +190,18 @@ public class Connection { + "\npeerAddress=" + peerAddress + "\nobjectId=" + getObjectId() + "\nuid=" + getUid() - + "\nisAuthenticated=" + isAuthenticated - + "\nsocket.getPort()=" + socket.getPort() + + "\nisAuthenticated=" + isAuthenticated() + + "\nsocket.getPort()=" + sharedSpace.getSocket().getPort() + "\n\n"); log.debug("ShutDown " + this.getObjectId()); log.debug("ShutDown connection requested. Connection=" + this.toString()); if (!stopped) { stopped = true; + inputHandler.stop(); + shutDownInProgress = true; - inputHandlerStopped = true; - connectionListener.onDisconnect(ConnectionListener.Reason.SHUT_DOWN, Connection.this); + UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.SHUT_DOWN, this)); if (sendCloseConnectionMessage) { sendMessage(new CloseConnectionMessage()); @@ -220,7 +210,7 @@ public class Connection { } try { - socket.close(); + sharedSpace.getSocket().close(); } catch (SocketException e) { log.trace("SocketException at shutdown might be expected " + e.getMessage()); } catch (IOException e) { @@ -238,24 +228,6 @@ public class Connection { } } - private void handleConnectionException(Exception e) { - if (e instanceof SocketException) { - if (socket.isClosed()) - connectionListener.onDisconnect(ConnectionListener.Reason.SOCKET_CLOSED, Connection.this); - else - connectionListener.onDisconnect(ConnectionListener.Reason.RESET, Connection.this); - } else if (e instanceof SocketTimeoutException) { - connectionListener.onDisconnect(ConnectionListener.Reason.TIMEOUT, Connection.this); - } else if (e instanceof EOFException) { - connectionListener.onDisconnect(ConnectionListener.Reason.PEER_DISCONNECTED, Connection.this); - } else { - log.info("Exception at connection with port " + socket.getLocalPort()); - e.printStackTrace(); - connectionListener.onDisconnect(ConnectionListener.Reason.UNKNOWN, Connection.this); - } - - shutDown(false); - } @Override public boolean equals(Object o) { @@ -265,8 +237,7 @@ public class Connection { Connection that = (Connection) o; if (port != that.port) return false; - if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false; - return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null); + return !(uid != null ? !uid.equals(that.uid) : that.uid != null); } @@ -274,21 +245,20 @@ public class Connection { public int hashCode() { int result = port; result = 31 * result + (uid != null ? uid.hashCode() : 0); - result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0); return result; } @Override public String toString() { return "Connection{" + - "objectId=" + getObjectId() + - ", uid=" + uid + - ", port=" + port + - ", isAuthenticated=" + isAuthenticated + + "port=" + port + + ", uid='" + uid + '\'' + + ", objectId='" + getObjectId() + '\'' + + ", sharedSpace=" + sharedSpace.toString() + ", peerAddress=" + peerAddress + - ", lastActivityDate=" + lastActivityDate + ", stopped=" + stopped + - ", inputHandlerStopped=" + inputHandlerStopped + + ", shutDownInProgress=" + shutDownInProgress + + ", useCompression=" + useCompression + '}'; } @@ -297,51 +267,179 @@ public class Connection { } + /////////////////////////////////////////////////////////////////////////////////////////// + // SharedSpace + /////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Holds all shared data between Connection and InputHandler + */ + private static class SharedSpace { + private static final Logger log = LoggerFactory.getLogger(SharedSpace.class); + + private final Connection connection; + private final Socket socket; + private final MessageListener messageListener; + private final ConnectionListener connectionListener; + private final boolean useCompression; + private final Map illegalRequests = new ConcurrentHashMap<>(); + + // mutable + private Date lastActivityDate; + + public SharedSpace(Connection connection, Socket socket, MessageListener messageListener, + ConnectionListener connectionListener, boolean useCompression) { + this.connection = connection; + this.socket = socket; + this.messageListener = messageListener; + this.connectionListener = connectionListener; + this.useCompression = useCompression; + } + + public synchronized void updateLastActivityDate() { + lastActivityDate = new Date(); + } + + public synchronized Date getLastActivityDate() { + return lastActivityDate; + } + + public void reportIllegalRequest(IllegalRequest illegalRequest) { + log.warn("We got reported an illegal request " + illegalRequest); + int prevCounter = illegalRequests.get(illegalRequest); + if (prevCounter > illegalRequest.maxTolerance) { + log.warn("We close connection as we received too many illegal requests.\n" + illegalRequests.toString()); + connection.shutDown(false); + } else { + illegalRequests.put(illegalRequest, ++prevCounter); + } + } + + public void handleConnectionException(Exception e) { + if (e instanceof SocketException) { + if (socket.isClosed()) + UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.SOCKET_CLOSED, connection)); + else + UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.RESET, connection)); + } else if (e instanceof SocketTimeoutException) { + UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.TIMEOUT, connection)); + } else if (e instanceof EOFException) { + UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.PEER_DISCONNECTED, connection)); + } else { + log.info("Exception at connection with port " + socket.getLocalPort()); + e.printStackTrace(); + UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.UNKNOWN, connection)); + } + + connection.shutDown(false); + } + + public void onMessage(Message message) { + UserThread.execute(() -> messageListener.onMessage(message, connection)); + } + + public boolean useCompression() { + return useCompression; + } + + public void shutDown(boolean sendCloseConnectionMessage) { + connection.shutDown(sendCloseConnectionMessage); + } + + public ConnectionListener getConnectionListener() { + return connectionListener; + } + + public Socket getSocket() { + return socket; + } + + public String getConnectionId() { + return connection.getObjectId(); + } + + @Override + public String toString() { + return "SharedSpace{" + + ", socket=" + socket + + ", useCompression=" + useCompression + + ", illegalRequests=" + illegalRequests + + ", lastActivityDate=" + lastActivityDate + + '}'; + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // InputHandler /////////////////////////////////////////////////////////////////////////////////////////// - private class InputHandler implements Runnable { + private static class InputHandler implements Runnable { + private static final Logger log = LoggerFactory.getLogger(InputHandler.class); + + private final SharedSpace sharedSpace; + private final ObjectInputStream objectInputStream; + private final int port; + private final ExecutorService executorService; + private volatile boolean stopped; + + public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, int port) { + this.sharedSpace = sharedSpace; + this.objectInputStream = objectInputStream; + this.port = port; + + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("InputHandler-onMessage-" + port) + .setDaemon(true) + .build(); + executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory); + } + + public void stop() { + stopped = true; + Utils.shutDownExecutorService(executorService); + } + @Override public void run() { try { - Thread.currentThread().setName("InputHandler-" + socket.getLocalPort()); - while (!inputHandlerStopped) { + Thread.currentThread().setName("InputHandler-" + port); + while (!stopped && !Thread.currentThread().isInterrupted()) { try { - log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId()); - Object rawInputObject = in.readObject(); - log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString() + log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId()); + Object rawInputObject = objectInputStream.readObject(); + log.trace("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId() + " rawInputObject " + rawInputObject); int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; - if (size <= MAX_MSG_SIZE) { + if (size <= getMaxMsgSize()) { Serializable serializable = null; - if (useCompression) { + if (sharedSpace.useCompression()) { if (rawInputObject instanceof byte[]) { byte[] compressedObjectAsBytes = (byte[]) rawInputObject; size = compressedObjectAsBytes.length; //log.trace("Read object compressed data size: " + size); serializable = Utils.decompress(compressedObjectAsBytes); } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); } } else { if (rawInputObject instanceof Serializable) { serializable = (Serializable) rawInputObject; } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); } } //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); // compressed size might be bigger theoretically so we check again after decompression - if (size <= MAX_MSG_SIZE) { + if (size <= getMaxMsgSize()) { if (serializable instanceof Message) { - lastActivityDate = new Date(); + sharedSpace.updateLastActivityDate(); Message message = (Message) serializable; if (message instanceof CloseConnectionMessage) { - inputHandlerStopped = true; - shutDown(false); + stopped = true; + sharedSpace.shutDown(false); } else { Task task = new Task() { @Override @@ -351,7 +449,7 @@ public class Connection { }; executorService.submit(() -> { try { - messageListener.onMessage(message, Connection.this); + sharedSpace.onMessage(message); } catch (Throwable t) { t.printStackTrace(); log.error("Executing task failed. " + t.getMessage()); @@ -359,19 +457,19 @@ public class Connection { }); } } else { - reportIllegalRequest(IllegalRequest.InvalidDataType); + sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); } } else { log.error("Received decompressed data exceeds max. msg size."); - reportIllegalRequest(IllegalRequest.MaxSizeExceeded); + sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } } else { log.error("Received compressed data exceeds max. msg size."); - reportIllegalRequest(IllegalRequest.MaxSizeExceeded); + sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded); } } catch (IOException | ClassNotFoundException e) { - inputHandlerStopped = true; - handleConnectionException(e); + stopped = true; + sharedSpace.handleConnectionException(e); } } } catch (Throwable t) { @@ -379,5 +477,14 @@ public class Connection { log.error("Executing task failed. " + t.getMessage()); } } + + @Override + public String toString() { + return "InputHandler{" + + "sharedSpace=" + sharedSpace + + ", port=" + port + + ", stopped=" + stopped + + '}'; + } } } \ No newline at end of file diff --git a/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java b/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java index 4fa9ef3b7b..2cbd77c54d 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/network/IllegalRequest.java @@ -5,9 +5,9 @@ public enum IllegalRequest { NotAuthenticated(2), InvalidDataType(2); - public final int limit; + public final int maxTolerance; - IllegalRequest(int limit) { - this.limit = limit; + IllegalRequest(int maxTolerance) { + this.maxTolerance = maxTolerance; } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java index e45a89a36d..63fef5362e 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.*; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; +import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.TorNode; @@ -14,9 +15,8 @@ import java.io.IOException; import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.Random; +import java.util.concurrent.*; import java.util.function.Consumer; public class LocalhostNetworkNode extends NetworkNode { @@ -47,7 +47,11 @@ public class LocalhostNetworkNode extends NetworkNode { public void start(@Nullable SetupListener setupListener) { if (setupListener != null) addSetupListener(setupListener); - executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("NetworkNode-" + port) + .setDaemon(true) + .build(); + executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory)); //Tor delay simulation createTorNode(torNode -> { @@ -89,6 +93,7 @@ public class LocalhostNetworkNode extends NetworkNode { private void createTorNode(final Consumer resultHandler) { Callable> task = () -> { + Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000)); long ts = System.currentTimeMillis(); if (simulateTorDelayTorNode > 0) Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS); @@ -102,7 +107,7 @@ public class LocalhostNetworkNode extends NetworkNode { ListenableFuture> future = executorService.submit(task); Futures.addCallback(future, new FutureCallback>() { public void onSuccess(TorNode torNode) { - resultHandler.accept(torNode); + UserThread.execute(() -> resultHandler.accept(torNode)); } public void onFailure(Throwable throwable) { @@ -113,6 +118,7 @@ public class LocalhostNetworkNode extends NetworkNode { private void createHiddenService(final Consumer resultHandler) { Callable task = () -> { + Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000)); long ts = System.currentTimeMillis(); if (simulateTorDelayHiddenService > 0) Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS); @@ -126,7 +132,7 @@ public class LocalhostNetworkNode extends NetworkNode { ListenableFuture future = executorService.submit(task); Futures.addCallback(future, new FutureCallback() { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { - resultHandler.accept(hiddenServiceDescriptor); + UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor)); } public void onFailure(Throwable throwable) { diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 97515fcc10..50948f83b1 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -1,6 +1,7 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.*; +import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Message; import org.jetbrains.annotations.NotNull; @@ -11,10 +12,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; @@ -24,8 +22,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); protected final int port; - private final List outBoundConnections = new CopyOnWriteArrayList<>(); - private final List inBoundConnections = new CopyOnWriteArrayList<>(); + private final Set outBoundConnections = Collections.synchronizedSet(new HashSet<>()); + private final Set inBoundConnections = Collections.synchronizedSet(new HashSet<>()); private final List messageListeners = new CopyOnWriteArrayList<>(); private final List connectionListeners = new CopyOnWriteArrayList<>(); protected final List setupListeners = new CopyOnWriteArrayList<>(); @@ -55,119 +53,88 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener public SettableFuture sendMessage(@NotNull Address peerAddress, Message message) { log.trace("sendMessage message=" + message); checkNotNull(peerAddress, "peerAddress must not be null"); - final SettableFuture resultFuture = SettableFuture.create(); - Callable task = () -> { - try { - Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress); + Optional outboundConnectionOptional = findOutboundConnection(peerAddress); + Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; + if (connection != null) + log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid()); - Optional outboundConnectionOptional = getOutboundConnection(peerAddress); - Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; + if (connection != null && connection.isStopped()) { + log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); + outBoundConnections.remove(connection); + connection = null; + } - if (connection != null && connection.isStopped()) { - log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); - outBoundConnections.remove(connection); - connection = null; - } + if (connection == null) { + Optional inboundConnectionOptional = findInboundConnection(peerAddress); + if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); + if (connection != null) + log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); + } - if (connection == null) { - Optional inboundConnectionOptional = getInboundConnection(peerAddress); - if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); - if (connection != null) - log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); - } - - if (connection == null) { + if (connection != null) { + return sendMessage(connection, message); + } else { + final SettableFuture resultFuture = SettableFuture.create(); + Callable task = () -> { + Connection newConnection; + try { + Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress); + log.trace("We have not found any connection for that peerAddress. " + + "We will create a new outbound connection."); try { Socket socket = getSocket(peerAddress); // can take a while when using tor - connection = new Connection(socket, - (message1, connection1) -> NetworkNode.this.onMessage(message1, connection1), - new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - NetworkNode.this.onConnection(connection); - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress); - NetworkNode.this.onDisconnect(reason, connection); - } - - @Override - public void onError(Throwable throwable) { - NetworkNode.this.onError(throwable); - } - }); - if (!outBoundConnections.contains(connection)) - outBoundConnections.add(connection); - else - log.error("We have already that connection in our list. That must not happen. " - + outBoundConnections + " / connection=" + connection); + newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this); + outBoundConnections.add(newConnection); log.info("\n\nNetworkNode created new outbound connection:" + "\npeerAddress=" + peerAddress.port - + "\nconnection.uid=" + connection.getUid() + + "\nconnection.uid=" + newConnection.getUid() + "\nmessage=" + message + "\n\n"); } catch (Throwable t) { - resultFuture.setException(t); + UserThread.execute(() -> resultFuture.setException(t)); return null; } + + newConnection.sendMessage(message); + + return newConnection; + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + UserThread.execute(() -> resultFuture.setException(t)); + throw t; + } + }; + + ListenableFuture future = executorService.submit(task); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Connection connection) { + UserThread.execute(() -> resultFuture.set(connection)); } - connection.sendMessage(message); - - return connection; - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - throw t; - } - }; - - ListenableFuture future = executorService.submit(task); - Futures.addCallback(future, new FutureCallback() { - public void onSuccess(Connection connection) { - resultFuture.set(connection); - } - - public void onFailure(@NotNull Throwable throwable) { - resultFuture.setException(throwable); - } - }); - return resultFuture; - } - - private Optional getOutboundConnection(Address peerAddress) { - return outBoundConnections.stream() - .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); - } - - private Optional getInboundConnection(Address peerAddress) { - return inBoundConnections.stream() - .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }); + return resultFuture; + } } public SettableFuture sendMessage(Connection connection, Message message) { - final SettableFuture resultFuture = SettableFuture.create(); - ListenableFuture future = executorService.submit(() -> { connection.sendMessage(message); return connection; }); + final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(future, new FutureCallback() { public void onSuccess(Connection connection) { - resultFuture.set(connection); + UserThread.execute(() -> resultFuture.set(connection)); } public void onFailure(@NotNull Throwable throwable) { - resultFuture.setException(throwable); + UserThread.execute(() -> resultFuture.setException(throwable)); } }); return resultFuture; @@ -191,8 +158,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener getAllConnections().stream().forEach(e -> e.shutDown()); log.info("NetworkNode shutdown complete"); - if (shutDownCompleteHandler != null) new Thread(shutDownCompleteHandler).start(); - ; + if (shutDownCompleteHandler != null) UserThread.execute(() -> shutDownCompleteHandler.run()); } } @@ -220,7 +186,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onConnection(Connection connection) { - connectionListeners.stream().forEach(e -> e.onConnection(connection)); + connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onConnection(connection))); } @Override @@ -228,7 +194,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.trace("onAuthenticationComplete peerAddress=" + peerAddress); log.trace("onAuthenticationComplete connection=" + connection); - connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); + connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onPeerAddressAuthenticated(peerAddress, connection))); } @Override @@ -237,12 +203,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress); outBoundConnections.remove(connection); inBoundConnections.remove(connection); - connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); + connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onDisconnect(reason, connection))); } @Override public void onError(Throwable throwable) { - connectionListeners.stream().forEach(e -> e.onError(throwable)); + connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onError(throwable))); } @@ -260,7 +226,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener @Override public void onMessage(Message message, Connection connection) { - messageListeners.stream().forEach(e -> e.onMessage(message, connection)); + messageListeners.stream().forEach(e -> UserThread.execute(() -> e.onMessage(message, connection))); } @@ -269,38 +235,47 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// protected void startServer(ServerSocket serverSocket) { - server = new Server(serverSocket, (message, connection) -> { - NetworkNode.this.onMessage(message, connection); - }, new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - // we still have not authenticated so put it to the temp list - if (!inBoundConnections.contains(connection)) - inBoundConnections.add(connection); - NetworkNode.this.onConnection(connection); - } + server = new Server(serverSocket, + (message, connection) -> NetworkNode.this.onMessage(message, connection), + new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + // we still have not authenticated so put it to the temp list + inBoundConnections.add(connection); + NetworkNode.this.onConnection(connection); + } - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); - } + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); + } - @Override - public void onDisconnect(Reason reason, Connection connection) { - Address peerAddress = connection.getPeerAddress(); - log.trace("onDisconnect at incoming connection to peerAddress " + peerAddress); - inBoundConnections.remove(connection); - NetworkNode.this.onDisconnect(reason, connection); - } + @Override + public void onDisconnect(Reason reason, Connection connection) { + Address peerAddress = connection.getPeerAddress(); + log.trace("onDisconnect at incoming connection to peerAddress " + peerAddress); + inBoundConnections.remove(connection); + NetworkNode.this.onDisconnect(reason, connection); + } - @Override - public void onError(Throwable throwable) { - NetworkNode.this.onError(throwable); - } - }); + @Override + public void onError(Throwable throwable) { + NetworkNode.this.onError(throwable); + } + }); executorService.submit(server); } + private Optional findOutboundConnection(Address peerAddress) { + return outBoundConnections.stream() + .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); + } + + private Optional findInboundConnection(Address peerAddress) { + return inBoundConnections.stream() + .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); + } + abstract protected Socket getSocket(Address peerAddress) throws IOException; @Nullable diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index ed6e171cdb..98faf23ed3 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -7,8 +7,8 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.HashSet; +import java.util.Set; public class Server implements Runnable { private static final Logger log = LoggerFactory.getLogger(Server.class); @@ -16,7 +16,7 @@ public class Server implements Runnable { private final ServerSocket serverSocket; private final MessageListener messageListener; private final ConnectionListener connectionListener; - private final List connections = new CopyOnWriteArrayList<>(); + private final Set connections = new HashSet<>(); private volatile boolean stopped; @@ -30,24 +30,27 @@ public class Server implements Runnable { public void run() { try { Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); - while (!stopped) { - try { + try { + while (!stopped && !Thread.currentThread().isInterrupted()) { log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); final Socket socket = serverSocket.accept(); - log.info("Accepted new client on port " + socket.getLocalPort()); - Connection connection = new Connection(socket, messageListener, connectionListener); - log.info("\n\nServer created new inbound connection:" - + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() - + "\nsocket.getPort()=" + socket.getPort() - + "\nconnection.uid=" + connection.getUid() - + "\n\n"); + if (!stopped) { + log.info("Accepted new client on port " + socket.getLocalPort()); + Connection connection = new Connection(socket, messageListener, connectionListener); + log.info("\n\nServer created new inbound connection:" + + "\nserverSocket.getLocalPort()=" + serverSocket.getLocalPort() + + "\nsocket.getPort()=" + socket.getPort() + + "\nconnection.uid=" + connection.getUid() + + "\n\n"); - log.info("Server created new socket with port " + socket.getPort()); - connections.add(connection); - } catch (IOException e) { - if (!stopped) - e.printStackTrace(); + log.info("Server created new socket with port " + socket.getPort()); + if (!stopped) + connections.add(connection); + } } + } catch (IOException e) { + if (!stopped) + e.printStackTrace(); } } catch (Throwable t) { t.printStackTrace(); @@ -67,8 +70,9 @@ public class Server implements Runnable { log.warn("SocketException at shutdown might be expected " + e.getMessage()); } catch (IOException e) { e.printStackTrace(); + log.error("Exception at shutdown. " + e.getMessage()); } finally { - log.debug("Server shutdown complete"); + log.info("Server shutdown complete"); } } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java index 2a85e30ef8..a506d35bc2 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/TorNetworkNode.java @@ -3,10 +3,9 @@ package io.bitsquare.p2p.network; import com.google.common.util.concurrent.*; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; -import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; +import io.bitsquare.common.UserThread; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Utils; -import io.bitsquare.p2p.network.messages.SelfTestMessage; import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.TorNode; import org.jetbrains.annotations.Nullable; @@ -16,14 +15,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.Socket; -import java.net.UnknownHostException; import java.util.Random; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.*; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; @@ -34,7 +29,6 @@ public class TorNetworkNode extends NetworkNode { private static final Random random = new Random(); private static final long TIMEOUT = 5000; - private static final long SELF_TEST_INTERVAL = 10 * 60 * 1000; private static final int MAX_ERRORS_BEFORE_RESTART = 3; private static final int MAX_RESTART_ATTEMPTS = 3; private static final int WAIT_BEFORE_RESTART = 2000; @@ -43,21 +37,13 @@ public class TorNetworkNode extends NetworkNode { private final File torDir; private TorNode torNode; private HiddenServiceDescriptor hiddenServiceDescriptor; - private Timer shutDownTimeoutTimer, selfTestTimer, selfTestTimeoutTimer; - private TimerTask selfTestTimeoutTask, selfTestTask; - private AtomicBoolean selfTestRunning = new AtomicBoolean(false); + private Timer shutDownTimeoutTimer; private long nonce; private int errorCounter; private int restartCounter; private Runnable shutDownCompleteHandler; private boolean torShutDownComplete, networkNodeShutDownDoneComplete; - static { - try { - new Socks5Proxy("", 0); - } catch (UnknownHostException e) { - } - } // ///////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -67,77 +53,6 @@ public class TorNetworkNode extends NetworkNode { super(port); this.torDir = torDir; - - init(); - } - - private void init() { - selfTestTimeoutTask = new TimerTask() { - @Override - public void run() { - try { - log.error("A timeout occurred at self test"); - stopSelfTestTimer(); - selfTestFailed(); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - } - }; - - selfTestTask = new TimerTask() { - @Override - public void run() { - try { - stopTimeoutTimer(); - if (selfTestRunning.get()) { - log.debug("running self test"); - selfTestTimeoutTimer = new Timer(); - selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT); - // might be interrupted by timeout task - if (selfTestRunning.get()) { - nonce = random.nextLong(); - log.trace("send msg with nonce " + nonce); - - try { - SettableFuture future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Sending self test message succeeded"); - } - - @Override - public void onFailure(Throwable throwable) { - log.error("Error at sending self test message. Exception = " + throwable); - stopTimeoutTimer(); - throwable.printStackTrace(); - selfTestFailed(); - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - } - }; - - addMessageListener((message, connection) -> { - if (message instanceof SelfTestMessage) { - if (((SelfTestMessage) message).nonce == nonce) { - runSelfTest(); - } else { - log.error("Nonce not matching our challenge. That should never happen."); - selfTestFailed(); - } - } - }); } @@ -151,13 +66,17 @@ public class TorNetworkNode extends NetworkNode { addSetupListener(setupListener); // executorService might have been shutdown before a restart, so we create a new one - executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("NetworkNode-" + port) + .setDaemon(true) + .build(); + executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory)); // Create the tor node (takes about 6 sec.) createTorNode(torDir, torNode -> { TorNetworkNode.this.torNode = torNode; - setupListeners.stream().forEach(e -> e.onTorNodeReady()); + setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onTorNodeReady())); // Create Hidden Service (takes about 40 sec.) createHiddenService(torNode, port, hiddenServiceDescriptor -> { @@ -166,10 +85,7 @@ public class TorNetworkNode extends NetworkNode { startServer(hiddenServiceDescriptor.getServerSocket()); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - setupListeners.stream().forEach(e -> e.onHiddenServiceReady()); - - // we are ready. so we start our periodic self test if our HS is available - // startSelfTest(); + setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady())); }); }); } @@ -188,13 +104,11 @@ public class TorNetworkNode extends NetworkNode { this.shutDownCompleteHandler = shutDownCompleteHandler; checkNotNull(executorService, "executorService must not be null"); - selfTestRunning.set(false); - stopSelfTestTimer(); - shutDownTimeoutTimer = new Timer(); shutDownTimeoutTimer.schedule(new TimerTask() { @Override public void run() { + Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000)); log.error("A timeout occurred at shutDown"); shutDownExecutorService(); } @@ -236,9 +150,9 @@ public class TorNetworkNode extends NetworkNode { }); } - // ///////////////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////////// // shutdown, restart - // ///////////////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////////// private void shutDownExecutorService() { shutDownTimeoutTimer.cancel(); @@ -252,14 +166,14 @@ public class TorNetworkNode extends NetworkNode { @Override public void onSuccess(Object o) { log.info("Shutdown completed"); - new Thread(shutDownCompleteHandler).start(); + UserThread.execute(() -> shutDownCompleteHandler.run()); } @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); log.error("Shutdown executorService failed with exception: " + throwable.getMessage()); - new Thread(shutDownCompleteHandler).start(); + UserThread.execute(() -> shutDownCompleteHandler.run()); } }); } @@ -284,6 +198,7 @@ public class TorNetworkNode extends NetworkNode { private void createTorNode(final File torDir, final Consumer resultHandler) { Callable> task = () -> { + Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000)); long ts = System.currentTimeMillis(); if (torDir.mkdirs()) log.trace("Created directory for tor"); @@ -314,6 +229,7 @@ public class TorNetworkNode extends NetworkNode { private void createHiddenService(final TorNode torNode, final int port, final Consumer resultHandler) { Callable task = () -> { + Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000)); long ts = System.currentTimeMillis(); log.debug("Create hidden service"); HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port); @@ -339,43 +255,6 @@ public class TorNetworkNode extends NetworkNode { } - // ///////////////////////////////////////////////////////////////////////////////////////// - // Self test - // ///////////////////////////////////////////////////////////////////////////////////////// - - private void startSelfTest() { - selfTestRunning.set(true); - //addListener(messageListener); - runSelfTest(); - } - - private void runSelfTest() { - stopSelfTestTimer(); - selfTestTimer = new Timer(); - selfTestTimer.schedule(selfTestTask, SELF_TEST_INTERVAL); - } - - private void stopSelfTestTimer() { - stopTimeoutTimer(); - if (selfTestTimer != null) - selfTestTimer.cancel(); - } - - private void stopTimeoutTimer() { - if (selfTestTimeoutTimer != null) - selfTestTimeoutTimer.cancel(); - } - - private void selfTestFailed() { - errorCounter++; - log.warn("Self test failed. Already " + errorCounter + " failure(s). Max. errors before restart: " - + MAX_ERRORS_BEFORE_RESTART); - if (errorCounter >= MAX_ERRORS_BEFORE_RESTART) - restartTor(); - else - runSelfTest(); - } - @Override protected Socket getSocket(Address peerAddress) throws IOException { checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address"); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java index 989fbb8ea8..08e4faa0d0 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/AuthenticationListener.java @@ -4,13 +4,13 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; public abstract class AuthenticationListener implements RoutingListener { - public void onFirstNeighborAdded(Neighbor neighbor) { + public void onFirstPeerAdded(Peer peer) { } - public void onNeighborAdded(Neighbor neighbor) { + public void onPeerAdded(Peer peer) { } - public void onNeighborRemoved(Address address) { + public void onPeerRemoved(Address address) { } abstract public void onConnectionAuthenticated(Connection connection); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/Neighbor.java b/network/src/main/java/io/bitsquare/p2p/routing/Peer.java similarity index 75% rename from network/src/main/java/io/bitsquare/p2p/routing/Neighbor.java rename to network/src/main/java/io/bitsquare/p2p/routing/Peer.java index f1b07eccdf..cb7b7a3141 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/Neighbor.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/Peer.java @@ -9,14 +9,14 @@ import java.io.IOException; import java.io.Serializable; import java.util.Random; -public class Neighbor implements Serializable { - private static final Logger log = LoggerFactory.getLogger(Neighbor.class); +public class Peer implements Serializable { + private static final Logger log = LoggerFactory.getLogger(Peer.class); public final Connection connection; public final Address address; private long pingNonce; - public Neighbor(Connection connection) { + public Peer(Connection connection) { this.connection = connection; this.address = connection.getPeerAddress(); pingNonce = new Random().nextLong(); @@ -43,16 +43,16 @@ public class Neighbor implements Serializable { @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof Neighbor)) return false; + if (!(o instanceof Peer)) return false; - Neighbor neighbor = (Neighbor) o; + Peer peer = (Peer) o; - return !(address != null ? !address.equals(neighbor.address) : neighbor.address != null); + return !(address != null ? !address.equals(peer.address) : peer.address != null); } @Override public String toString() { - return "Neighbor{" + + return "Peer{" + "address=" + address + ", pingNonce=" + pingNonce + '}'; diff --git a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java index 0629b76e72..b711c8bc3f 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/Routing.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/Routing.java @@ -38,8 +38,8 @@ public class Routing { private final List
seedNodes; private final Map nonceMap = new ConcurrentHashMap<>(); private final List routingListeners = new CopyOnWriteArrayList<>(); - private final Map connectedNeighbors = new ConcurrentHashMap<>(); - private final Set
reportedNeighborAddresses = Collections.synchronizedSet(new HashSet<>()); + private final Map authenticatedPeers = new ConcurrentHashMap<>(); + private final Set
reportedPeerAddresses = Collections.synchronizedSet(new HashSet<>()); private final Map authenticationCompleteHandlers = new ConcurrentHashMap<>(); private final Timer maintenanceTimer = new Timer(); private final ExecutorService executorService; @@ -87,7 +87,7 @@ public class Routing { public void onDisconnect(Reason reason, Connection connection) { // only removes authenticated nodes if (connection.isAuthenticated()) - removeNeighbor(connection.getPeerAddress()); + removePeer(connection.getPeerAddress()); } @Override @@ -116,9 +116,12 @@ public class Routing { maintenanceTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { + Thread.currentThread().setName("RoutingMaintenanceTimer-" + new Random().nextInt(1000)); try { - disconnectOldConnections(); - pingNeighbors(); + UserThread.execute(() -> { + disconnectOldConnections(); + pingPeers(); + }); } catch (Throwable t) { t.printStackTrace(); log.error("Executing task failed. " + t.getMessage()); @@ -141,10 +144,10 @@ public class Routing { } } - private void pingNeighbors() { - log.trace("pingNeighbors"); - List connectedNeighborsList = new ArrayList<>(connectedNeighbors.values()); - connectedNeighborsList.stream() + private void pingPeers() { + log.trace("pingPeers"); + List connectedPeersList = new ArrayList<>(authenticatedPeers.values()); + connectedPeersList.stream() .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) .forEach(e -> { SettableFuture future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); @@ -157,7 +160,7 @@ public class Routing { @Override public void onFailure(@NotNull Throwable throwable) { log.info("PingMessage sending failed " + throwable.getMessage()); - removeNeighbor(e.address); + removePeer(e.address); } }); Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS); @@ -180,25 +183,25 @@ public class Routing { } public void broadcast(BroadcastMessage message, @Nullable Address sender) { - log.trace("Broadcast message to " + connectedNeighbors.values().size() + " neighbors."); + log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers."); log.trace("message = " + message); - printConnectedNeighborsMap(); + printConnectedPeersMap(); - connectedNeighbors.values().stream() + authenticatedPeers.values().stream() .filter(e -> !e.address.equals(sender)) - .forEach(neighbor -> { - log.trace("Broadcast message from " + getAddress() + " to " + neighbor.address + "."); - SettableFuture future = networkNode.sendMessage(neighbor.address, message); + .forEach(peer -> { + log.trace("Broadcast message from " + getAddress() + " to " + peer.address + "."); + SettableFuture future = networkNode.sendMessage(peer.address, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Broadcast from " + getAddress() + " to " + neighbor.address + " succeeded."); + log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded."); } @Override public void onFailure(@NotNull Throwable throwable) { log.info("Broadcast failed. " + throwable.getMessage()); - removeNeighbor(neighbor.address); + removePeer(peer.address); } }); }); @@ -220,18 +223,18 @@ public class Routing { routingListeners.remove(routingListener); } - public Map getConnectedNeighbors() { - return connectedNeighbors; + public Map getAuthenticatedPeers() { + return authenticatedPeers; } // Use ArrayList not List as we need it serializable - public ArrayList
getAllNeighborAddresses() { - ArrayList
allNeighborAddresses = new ArrayList<>(reportedNeighborAddresses); - allNeighborAddresses.addAll(connectedNeighbors.values().stream() + public ArrayList
getAllPeerAddresses() { + ArrayList
allPeerAddresses = new ArrayList<>(reportedPeerAddresses); + allPeerAddresses.addAll(authenticatedPeers.values().stream() .map(e -> e.address).collect(Collectors.toList())); // remove own address and seed nodes - allNeighborAddresses.remove(getAddress()); - return allNeighborAddresses; + allPeerAddresses.remove(getAddress()); + return allPeerAddresses; } @@ -244,11 +247,11 @@ public class Routing { // node1: close connection // node1 -> node2 ChallengeMessage on new connection // node2: authentication to node1 done if nonce ok - // node2 -> node1 GetNeighborsMessage + // node2 -> node1 GetPeersMessage // node1: authentication to node2 done if nonce ok - // node1 -> node2 NeighborsMessage + // node1 -> node2 PeersMessage - public void startAuthentication(List
connectedSeedNodes) { + public void startAuthentication(Set
connectedSeedNodes) { connectedSeedNodes.forEach(connectedSeedNode -> { executorService.submit(() -> { try { @@ -267,7 +270,7 @@ public class Routing { log.info("We try to authenticate to a random seed node. " + address); startAuthTs = System.currentTimeMillis(); final boolean[] alreadyConnected = {false}; - connectedNeighbors.values().stream().forEach(e -> { + authenticatedPeers.values().stream().forEach(e -> { remainingSeedNodes.remove(e.address); if (address.equals(e.address)) alreadyConnected[0] = true; @@ -362,11 +365,11 @@ public class Routing { boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress); if (verified) { SettableFuture future = networkNode.sendMessage(peerAddress, - new GetNeighborsMessage(getAddress(), challengeMessage.challengerNonce, getAllNeighborAddresses())); + new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses())); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("GetNeighborsMessage sent successfully from " + getAddress() + " to " + peerAddress); + log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress); /* // we wait to get the success to reduce the time span of the moment of // authentication at both sides of the connection @@ -375,52 +378,52 @@ public class Routing { @Override public void onFailure(@NotNull Throwable throwable) { - log.info("GetNeighborsMessage sending failed " + throwable.getMessage()); - removeNeighbor(peerAddress); + log.info("GetPeersMessage sending failed " + throwable.getMessage()); + removePeer(peerAddress); } }); } else { log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap); } - } else if (message instanceof GetNeighborsMessage) { - GetNeighborsMessage getNeighborsMessage = (GetNeighborsMessage) message; - Address peerAddress = getNeighborsMessage.address; - log.trace("GetNeighborsMessage from " + peerAddress + " at " + getAddress()); - boolean verified = verifyNonceAndAuthenticatePeerAddress(getNeighborsMessage.challengerNonce, peerAddress); + } else if (message instanceof GetPeersMessage) { + GetPeersMessage getPeersMessage = (GetPeersMessage) message; + Address peerAddress = getPeersMessage.address; + log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress()); + boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress); if (verified) { setAuthenticated(connection, peerAddress); - purgeReportedNeighbors(); + purgeReportedPeers(); SettableFuture future = networkNode.sendMessage(peerAddress, - new NeighborsMessage(getAddress(), getAllNeighborAddresses())); - log.trace("sent NeighborsMessage to " + peerAddress + " from " + getAddress() - + " with allNeighbors=" + getAllNeighborAddresses()); + new PeersMessage(getAddress(), getAllPeerAddresses())); + log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress() + + " with allPeers=" + getAllPeerAddresses()); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("NeighborsMessage sent successfully from " + getAddress() + " to " + peerAddress); + log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress); } @Override public void onFailure(@NotNull Throwable throwable) { - log.info("NeighborsMessage sending failed " + throwable.getMessage()); - removeNeighbor(peerAddress); + log.info("PeersMessage sending failed " + throwable.getMessage()); + removePeer(peerAddress); } }); - // now we add the reported neighbors to our own set - ArrayList
neighborAddresses = ((GetNeighborsMessage) message).neighborAddresses; - log.trace("Received neighbors: " + neighborAddresses); + // now we add the reported peers to our own set + ArrayList
peerAddresses = ((GetPeersMessage) message).peerAddresses; + log.trace("Received peers: " + peerAddresses); // remove ourselves - addToReportedNeighbors(neighborAddresses, connection); + addToReportedPeers(peerAddresses, connection); } - } else if (message instanceof NeighborsMessage) { - NeighborsMessage neighborsMessage = (NeighborsMessage) message; - Address peerAddress = neighborsMessage.address; - log.trace("NeighborsMessage from " + peerAddress + " at " + getAddress()); - ArrayList
neighborAddresses = neighborsMessage.neighborAddresses; - log.trace("Received neighbors: " + neighborAddresses); + } else if (message instanceof PeersMessage) { + PeersMessage peersMessage = (PeersMessage) message; + Address peerAddress = peersMessage.address; + log.trace("PeersMessage from " + peerAddress + " at " + getAddress()); + ArrayList
peerAddresses = peersMessage.peerAddresses; + log.trace("Received peers: " + peerAddresses); // remove ourselves - addToReportedNeighbors(neighborAddresses, connection); + addToReportedPeers(peerAddresses, connection); // we wait until the handshake is completed before setting the authenticate flag // authentication at both sides of the connection @@ -435,58 +438,58 @@ public class Routing { if (authenticationCompleteHandler != null) authenticationCompleteHandler.run(); - authenticateToNextRandomNeighbor(); + authenticateToNextRandomPeer(); } } - private void addToReportedNeighbors(ArrayList
neighborAddresses, Connection connection) { - log.trace("addToReportedNeighbors"); - // we disconnect misbehaving nodes trying to send too many neighbors - // reported neighbors include the peers connected neighbors which is normally max. 8 but we give some headroom + private void addToReportedPeers(ArrayList
peerAddresses, Connection connection) { + log.trace("addToReportedPeers"); + // we disconnect misbehaving nodes trying to send too many peers + // reported peers include the peers connected peers which is normally max. 8 but we give some headroom // for safety - if (neighborAddresses.size() > 1100) { + if (peerAddresses.size() > 1100) { connection.shutDown(); } else { - neighborAddresses.remove(getAddress()); - reportedNeighborAddresses.addAll(neighborAddresses); - purgeReportedNeighbors(); + peerAddresses.remove(getAddress()); + reportedPeerAddresses.addAll(peerAddresses); + purgeReportedPeers(); } } - private void purgeReportedNeighbors() { - log.trace("purgeReportedNeighbors"); - int all = getAllNeighborAddresses().size(); + private void purgeReportedPeers() { + log.trace("purgeReportedPeers"); + int all = getAllPeerAddresses().size(); if (all > 1000) { int diff = all - 100; - List
list = getNotConnectedNeighborAddresses(); + List
list = getNotConnectedPeerAddresses(); for (int i = 0; i < diff; i++) { Address toRemove = list.remove(new Random().nextInt(list.size())); - reportedNeighborAddresses.remove(toRemove); + reportedPeerAddresses.remove(toRemove); } } } - private List
getNotConnectedNeighborAddresses() { - ArrayList
list = new ArrayList<>(getAllNeighborAddresses()); - log.debug("## getNotConnectedNeighborAddresses "); - log.debug("## reportedNeighborsList=" + list); - connectedNeighbors.values().stream().forEach(e -> list.remove(e.address)); - log.debug("## connectedNeighbors=" + connectedNeighbors); - log.debug("## reportedNeighborsList=" + list); + private List
getNotConnectedPeerAddresses() { + ArrayList
list = new ArrayList<>(getAllPeerAddresses()); + log.debug("## getNotConnectedPeerAddresses "); + log.debug("## reportedPeersList=" + list); + authenticatedPeers.values().stream().forEach(e -> list.remove(e.address)); + log.debug("## connectedPeers=" + authenticatedPeers); + log.debug("## reportedPeersList=" + list); return list; } - private void authenticateToNextRandomNeighbor() { + private void authenticateToNextRandomPeer() { executorService.submit(() -> { try { Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS); - if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) { - Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress(); - if (randomNotConnectedNeighborAddress != null) { - log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress); - authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor()); + if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) { + Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress(); + if (randomNotConnectedPeerAddress != null) { + log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress); + authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer()); } else { - log.info("No more neighbors available for connecting."); + log.info("No more peers available for connecting."); } } else { log.info("We have already enough connections."); @@ -515,7 +518,7 @@ public class Routing { @Override public void onFailure(@NotNull Throwable throwable) { log.info("send IdMessage failed. " + throwable.getMessage()); - removeNeighbor(address); + removePeer(address); if (faultHandler != null) faultHandler.run(); } }); @@ -545,18 +548,18 @@ public class Routing { + "\npeerAddress= " + peerAddress + "\n############################################################\n"); - connection.onAuthenticationComplete(peerAddress, connection); + connection.setAuthenticated(peerAddress, connection); - Neighbor neighbor = new Neighbor(connection); - addConnectedNeighbor(peerAddress, neighbor); + Peer peer = new Peer(connection); + addAuthenticatedPeer(peerAddress, peer); routingListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); log.debug("\n### setAuthenticated post connection " + connection); } - private Address getRandomNotConnectedNeighborAddress() { - List
list = getNotConnectedNeighborAddresses(); + private Address getRandomNotConnectedPeerAddress() { + List
list = getNotConnectedPeerAddresses(); if (list.size() > 0) { Collections.shuffle(list); return list.get(0); @@ -583,14 +586,14 @@ public class Routing { @Override public void onFailure(@NotNull Throwable throwable) { log.info("PongMessage sending failed " + throwable.getMessage()); - removeNeighbor(connection.getPeerAddress()); + removePeer(connection.getPeerAddress()); } }); } else if (message instanceof PongMessage) { - Neighbor neighbor = connectedNeighbors.get(connection.getPeerAddress()); - if (neighbor != null) { - if (((PongMessage) message).nonce != neighbor.getPingNonce()) { - removeNeighbor(neighbor.address); + Peer peer = authenticatedPeers.get(connection.getPeerAddress()); + if (peer != null) { + if (((PongMessage) message).nonce != peer.getPingNonce()) { + removePeer(peer.address); log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress()); } } @@ -599,41 +602,41 @@ public class Routing { /////////////////////////////////////////////////////////////////////////////////////////// - // Neighbors + // Peers /////////////////////////////////////////////////////////////////////////////////////////// - private void removeNeighbor(@Nullable Address peerAddress) { - reportedNeighborAddresses.remove(peerAddress); + private void removePeer(@Nullable Address peerAddress) { + reportedPeerAddresses.remove(peerAddress); - Neighbor disconnectedNeighbor; - disconnectedNeighbor = connectedNeighbors.remove(peerAddress); + Peer disconnectedPeer; + disconnectedPeer = authenticatedPeers.remove(peerAddress); - if (disconnectedNeighbor != null) - UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onNeighborRemoved(peerAddress))); + if (disconnectedPeer != null) + UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress))); - log.trace("removeNeighbor [post]"); - printConnectedNeighborsMap(); - printReportedNeighborsMap(); - - log.trace("removeNeighbor nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); + log.trace("removePeer [post]"); + printConnectedPeersMap(); + printReportedPeersMap(); + + log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); nonceMap.remove(peerAddress); } - private void addConnectedNeighbor(Address address, Neighbor neighbor) { - boolean firstNeighborAdded; - connectedNeighbors.put(address, neighbor); - firstNeighborAdded = connectedNeighbors.size() == 1; + private void addAuthenticatedPeer(Address address, Peer peer) { + boolean firstPeerAdded; + authenticatedPeers.put(address, peer); + firstPeerAdded = authenticatedPeers.size() == 1; - UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onNeighborAdded(neighbor))); + UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onPeerAdded(peer))); - if (firstNeighborAdded) - UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onFirstNeighborAdded(neighbor))); + if (firstPeerAdded) + UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onFirstPeerAdded(peer))); - if (connectedNeighbors.size() > MAX_CONNECTIONS) + if (authenticatedPeers.size() > MAX_CONNECTIONS) disconnectOldConnections(); - log.trace("addConnectedNeighbor [post]"); - printConnectedNeighborsMap(); + log.trace("addConnectedPeer [post]"); + printConnectedPeersMap(); } private Address getAddress() { @@ -645,18 +648,18 @@ public class Routing { // Utils /////////////////////////////////////////////////////////////////////////////////////////// - public void printConnectedNeighborsMap() { - StringBuilder result = new StringBuilder("\nConnected neighbors for node " + getAddress() + ":"); - connectedNeighbors.values().stream().forEach(e -> { + public void printConnectedPeersMap() { + StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":"); + authenticatedPeers.values().stream().forEach(e -> { result.append("\n\t" + e.address); }); result.append("\n"); log.info(result.toString()); } - public void printReportedNeighborsMap() { - StringBuilder result = new StringBuilder("\nReported neighborAddresses for node " + getAddress() + ":"); - reportedNeighborAddresses.stream().forEach(e -> { + public void printReportedPeersMap() { + StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":"); + reportedPeerAddresses.stream().forEach(e -> { result.append("\n\t" + e); }); result.append("\n"); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java b/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java index 1cc9258dae..d3d8840729 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/RoutingListener.java @@ -4,11 +4,11 @@ import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; public interface RoutingListener { - void onFirstNeighborAdded(Neighbor neighbor); + void onFirstPeerAdded(Peer peer); - void onNeighborAdded(Neighbor neighbor); + void onPeerAdded(Peer peer); - void onNeighborRemoved(Address address); + void onPeerRemoved(Address address); // TODO remove void onConnectionAuthenticated(Connection connection); diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/GetNeighborsMessage.java b/network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java similarity index 62% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/GetNeighborsMessage.java rename to network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java index 672f30dda6..9412c8b97b 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/GetNeighborsMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/messages/GetPeersMessage.java @@ -5,26 +5,26 @@ import io.bitsquare.p2p.Address; import java.util.ArrayList; -public final class GetNeighborsMessage implements AuthenticationMessage { +public final class GetPeersMessage implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; public final long challengerNonce; - public final ArrayList
neighborAddresses; + public final ArrayList
peerAddresses; - public GetNeighborsMessage(Address address, long challengerNonce, ArrayList
neighborAddresses) { + public GetPeersMessage(Address address, long challengerNonce, ArrayList
peerAddresses) { this.address = address; this.challengerNonce = challengerNonce; - this.neighborAddresses = neighborAddresses; + this.peerAddresses = peerAddresses; } @Override public String toString() { - return "GetNeighborsMessage{" + + return "GetPeersMessage{" + "address=" + address + ", challengerNonce=" + challengerNonce + - ", neighborAddresses=" + neighborAddresses + + ", peerAddresses=" + peerAddresses + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/routing/messages/NeighborsMessage.java b/network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java similarity index 56% rename from network/src/main/java/io/bitsquare/p2p/routing/messages/NeighborsMessage.java rename to network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java index b82565794a..8d05a33044 100644 --- a/network/src/main/java/io/bitsquare/p2p/routing/messages/NeighborsMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/routing/messages/PeersMessage.java @@ -5,21 +5,21 @@ import io.bitsquare.p2p.Address; import java.util.ArrayList; -public final class NeighborsMessage implements AuthenticationMessage { +public final class PeersMessage implements AuthenticationMessage { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final Address address; - public final ArrayList
neighborAddresses; + public final ArrayList
peerAddresses; - public NeighborsMessage(Address address, ArrayList
neighborAddresses) { + public PeersMessage(Address address, ArrayList
peerAddresses) { this.address = address; - this.neighborAddresses = neighborAddresses; + this.peerAddresses = peerAddresses; } @Override public String toString() { - return "NeighborsMessage{" + "neighborAddresses=" + neighborAddresses + '}'; + return "PeersMessage{" + "peerAddresses=" + peerAddresses + '}'; } } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index 4eefb95df1..458862a673 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -20,10 +20,7 @@ import java.io.File; import java.math.BigInteger; import java.security.KeyPair; import java.security.PublicKey; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -80,21 +77,21 @@ public class ProtectedExpirableDataStorage { } }); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - log.info("removeExpiredEntries called "); - map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) - .forEach(entry -> map.remove(entry.getKey())); - } catch (Throwable t) { - t.printStackTrace(); - log.error("Executing task failed. " + t.getMessage()); - } - } - }, - CHECK_TTL_INTERVAL, - CHECK_TTL_INTERVAL); + TimerTask task = new TimerTask() { + @Override + public void run() { + Thread.currentThread().setName("RemoveExpiredEntriesTimer-" + new Random().nextInt(1000)); + try { + log.info("removeExpiredEntries called "); + map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) + .forEach(entry -> map.remove(entry.getKey())); + } catch (Throwable t) { + t.printStackTrace(); + log.error("Executing task failed. " + t.getMessage()); + } + } + }; + timer.scheduleAtFixedRate(task, CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL); } @@ -127,7 +124,7 @@ public class ProtectedExpirableDataStorage { if (result) { map.put(hashOfPayload, protectedData); - log.trace("Data added to our map and it will be broadcasted to our neighbors."); + log.trace("Data added to our map and it will be broadcasted to our peers."); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData))); StringBuilder sb = new StringBuilder("\n\n----------------------------------------------------\n" + @@ -244,7 +241,7 @@ public class ProtectedExpirableDataStorage { private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) { map.remove(hashOfPayload); - log.trace("Data removed from our map. We broadcast the message to our neighbors."); + log.trace("Data removed from our map. We broadcast the message to our peers."); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData))); StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n"); diff --git a/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java b/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java index 7f3fadbcb3..aa7f01e6af 100644 --- a/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java +++ b/network/src/test/java/io/bitsquare/p2p/routing/RoutingTest.java @@ -107,7 +107,7 @@ public class RoutingTest { P2PService p2PService1 = seedNode1.getP2PService(); latch.await(); Thread.sleep(500); - Assert.assertEquals(0, p2PService1.getRouting().getAllNeighborAddresses().size()); + Assert.assertEquals(0, p2PService1.getRouting().getAllPeerAddresses().size()); } @Test @@ -180,8 +180,8 @@ public class RoutingTest { }); P2PService p2PService2 = seedNode2.getP2PService(); latch.await(); - Assert.assertEquals(1, p2PService1.getRouting().getAllNeighborAddresses().size()); - Assert.assertEquals(1, p2PService2.getRouting().getAllNeighborAddresses().size()); + Assert.assertEquals(1, p2PService1.getRouting().getAllPeerAddresses().size()); + Assert.assertEquals(1, p2PService2.getRouting().getAllPeerAddresses().size()); } // @Test @@ -201,9 +201,9 @@ public class RoutingTest { // node1: close connection // node1 -> node2 ChallengeMessage on new connection // node2: authentication to node1 done if nonce ok - // node2 -> node1 GetNeighborsMessage + // node2 -> node1 GetPeersMessage // node1: authentication to node2 done if nonce ok - // node1 -> node2 NeighborsMessage + // node1 -> node2 PeersMessage // first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2 CountDownLatch latch1 = new CountDownLatch(2); @@ -228,7 +228,7 @@ public class RoutingTest { seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1); seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2); - // wait until Neighbors msg finished + // wait until Peers msg finished Thread.sleep(sleepTime); // authentication: @@ -259,7 +259,7 @@ public class RoutingTest { }); latch2.await(); - // wait until Neighbors msg finished + // wait until Peers msg finished Thread.sleep(sleepTime); @@ -282,9 +282,9 @@ public class RoutingTest { // node1: close connection // node1 -> node2 ChallengeMessage on new connection // node2: authentication to node1 done if nonce ok - // node2 -> node1 GetNeighborsMessage + // node2 -> node1 GetPeersMessage // node1: authentication to node2 done if nonce ok - // node1 -> node2 NeighborsMessage + // node1 -> node2 PeersMessage // first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2 CountDownLatch latch1 = new CountDownLatch(2); @@ -364,8 +364,8 @@ public class RoutingTest { // total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20 // total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46 for (int i = 0; i < length; i++) { - nodes[i].getP2PService().getRouting().printConnectedNeighborsMap(); - nodes[i].getP2PService().getRouting().printReportedNeighborsMap(); + nodes[i].getP2PService().getRouting().printConnectedPeersMap(); + nodes[i].getP2PService().getRouting().printReportedPeersMap(); } CountDownLatch shutDownLatch = new CountDownLatch(length);