diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 97f5d1a2cd..c4e5430d37 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -32,7 +32,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener final int servicePort; - private final CopyOnWriteArraySet inBoundConnections = new CopyOnWriteArraySet<>(); + private final Set inBoundConnections = new HashSet<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet connectionListeners = new CopyOnWriteArraySet<>(); final CopyOnWriteArraySet setupListeners = new CopyOnWriteArraySet<>(); @@ -41,7 +41,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private volatile boolean shutDownInProgress; // accessed from different threads - private final CopyOnWriteArraySet outBoundConnections = new CopyOnWriteArraySet<>(); + private final Set outBoundConnections = new HashSet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -109,7 +109,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener return existingConnection; } else { outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress); - outBoundConnections.add(outboundConnection); + synchronized (outBoundConnections) { + outBoundConnections.add(outboundConnection); + } log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "NetworkNode created new outbound connection:" @@ -124,8 +126,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener return outboundConnection; } } catch (Throwable throwable) { - if (!(throwable instanceof ConnectException || throwable instanceof IOException)) { - log.error("Executing task failed. " + throwable.getMessage()); + if (!(throwable instanceof ConnectException || + throwable instanceof IOException || + throwable instanceof TimeoutException)) { + log.warn("Executing task failed. " + throwable.getMessage()); } throw throwable; } @@ -153,7 +157,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); if (connection.isStopped()) { log.warn("We have a connection which is already stopped in inBoundConnections. Connection.uid=" + connection.getUid()); - inBoundConnections.remove(connection); + synchronized (inBoundConnections) { + inBoundConnections.remove(connection); + } return null; } else { return connection; @@ -171,7 +177,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid()); if (connection.isStopped()) { log.warn("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); - outBoundConnections.remove(connection); + synchronized (outBoundConnections) { + outBoundConnections.remove(connection); + } return null; } else { return connection; @@ -205,8 +213,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener public Set getAllConnections() { // Can contain inbound and outbound connections with the same peer node address, // as connection hashcode is using uid and port info - Set set = new HashSet<>(inBoundConnections); - set.addAll(outBoundConnections); + Set set; + synchronized (inBoundConnections) { + set = new HashSet<>(inBoundConnections); + } + synchronized (outBoundConnections) { + set.addAll(outBoundConnections); + } return set; } @@ -269,7 +282,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener log.warn("We have the connection in our inBoundConnections. That must not happen as it should be called " + "from the server listener and get removed from there."); printOutBoundConnections(); - outBoundConnections.remove(connection); + synchronized (outBoundConnections) { + outBoundConnections.remove(connection); + } // inbound connections are removed in the listener of the server connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection)); } @@ -334,14 +349,18 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener ConnectionListener connectionListener = new ConnectionListener() { @Override public void onConnection(Connection connection) { - inBoundConnections.add((InboundConnection) connection); + synchronized (inBoundConnections) { + inBoundConnections.add((InboundConnection) connection); + } NetworkNode.this.onConnection(connection); } @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { log.trace("onDisconnect at server socket connectionListener\n\tconnection={}" + connection); - inBoundConnections.remove(connection); + synchronized (inBoundConnections) { + inBoundConnections.remove(connection); + } printInboundConnections(); NetworkNode.this.onDisconnect(closeConnectionReason, connection); } @@ -360,9 +379,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private Optional lookupOutBoundConnection(NodeAddress peersNodeAddress) { log.trace("lookupOutboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress()); printOutBoundConnections(); - return outBoundConnections.stream() - .filter(connection -> connection.hasPeersNodeAddress() && - peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); + Optional outboundConnectionOptional; + synchronized (outBoundConnections) { + outboundConnectionOptional = outBoundConnections.stream() + .filter(connection -> connection.hasPeersNodeAddress() && + peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); + } + return outboundConnectionOptional; } private void printOutBoundConnections() { @@ -375,9 +398,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private Optional lookupInBoundConnection(NodeAddress peersNodeAddress) { log.trace("lookupInboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress()); printInboundConnections(); - return inBoundConnections.stream() - .filter(connection -> connection.hasPeersNodeAddress() && - peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); + Optional inboundConnectionOptional; + synchronized (inBoundConnections) { + inboundConnectionOptional = inBoundConnections.stream() + .filter(connection -> connection.hasPeersNodeAddress() && + peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); + } + return inboundConnectionOptional; } private void printInboundConnections() {