synchronize sets for connections

This commit is contained in:
Manfred Karrer 2016-02-23 02:42:54 +01:00
parent 82c96d4be7
commit 088f04b478

View file

@ -32,7 +32,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
final int servicePort; final int servicePort;
private final CopyOnWriteArraySet<InboundConnection> inBoundConnections = new CopyOnWriteArraySet<>(); private final Set<InboundConnection> inBoundConnections = new HashSet<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>(); final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
@ -41,7 +41,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
// accessed from different threads // accessed from different threads
private final CopyOnWriteArraySet<OutboundConnection> outBoundConnections = new CopyOnWriteArraySet<>(); private final Set<OutboundConnection> outBoundConnections = new HashSet<>();
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -109,7 +109,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
return existingConnection; return existingConnection;
} else { } else {
outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress); outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress);
synchronized (outBoundConnections) {
outBoundConnections.add(outboundConnection); outBoundConnections.add(outboundConnection);
}
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:" "NetworkNode created new outbound connection:"
@ -124,8 +126,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
return outboundConnection; return outboundConnection;
} }
} catch (Throwable throwable) { } catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException)) { if (!(throwable instanceof ConnectException ||
log.error("Executing task failed. " + throwable.getMessage()); throwable instanceof IOException ||
throwable instanceof TimeoutException)) {
log.warn("Executing task failed. " + throwable.getMessage());
} }
throw throwable; throw throwable;
} }
@ -153,7 +157,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
if (connection.isStopped()) { if (connection.isStopped()) {
log.warn("We have a connection which is already stopped in inBoundConnections. Connection.uid=" + connection.getUid()); log.warn("We have a connection which is already stopped in inBoundConnections. Connection.uid=" + connection.getUid());
synchronized (inBoundConnections) {
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
}
return null; return null;
} else { } else {
return connection; return connection;
@ -171,7 +177,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid()); log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid());
if (connection.isStopped()) { if (connection.isStopped()) {
log.warn("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); log.warn("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
synchronized (outBoundConnections) {
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
}
return null; return null;
} else { } else {
return connection; return connection;
@ -205,8 +213,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public Set<Connection> getAllConnections() { public Set<Connection> getAllConnections() {
// Can contain inbound and outbound connections with the same peer node address, // Can contain inbound and outbound connections with the same peer node address,
// as connection hashcode is using uid and port info // as connection hashcode is using uid and port info
Set<Connection> set = new HashSet<>(inBoundConnections); Set<Connection> set;
synchronized (inBoundConnections) {
set = new HashSet<>(inBoundConnections);
}
synchronized (outBoundConnections) {
set.addAll(outBoundConnections); set.addAll(outBoundConnections);
}
return set; return set;
} }
@ -269,7 +282,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.warn("We have the connection in our inBoundConnections. That must not happen as it should be called " + log.warn("We have the connection in our inBoundConnections. That must not happen as it should be called " +
"from the server listener and get removed from there."); "from the server listener and get removed from there.");
printOutBoundConnections(); printOutBoundConnections();
synchronized (outBoundConnections) {
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
}
// inbound connections are removed in the listener of the server // inbound connections are removed in the listener of the server
connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection)); connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection));
} }
@ -334,14 +349,18 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
ConnectionListener connectionListener = new ConnectionListener() { ConnectionListener connectionListener = new ConnectionListener() {
@Override @Override
public void onConnection(Connection connection) { public void onConnection(Connection connection) {
synchronized (inBoundConnections) {
inBoundConnections.add((InboundConnection) connection); inBoundConnections.add((InboundConnection) connection);
}
NetworkNode.this.onConnection(connection); NetworkNode.this.onConnection(connection);
} }
@Override @Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.trace("onDisconnect at server socket connectionListener\n\tconnection={}" + connection); log.trace("onDisconnect at server socket connectionListener\n\tconnection={}" + connection);
synchronized (inBoundConnections) {
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
}
printInboundConnections(); printInboundConnections();
NetworkNode.this.onDisconnect(closeConnectionReason, connection); NetworkNode.this.onDisconnect(closeConnectionReason, connection);
} }
@ -360,10 +379,14 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional<OutboundConnection> lookupOutBoundConnection(NodeAddress peersNodeAddress) { private Optional<OutboundConnection> lookupOutBoundConnection(NodeAddress peersNodeAddress) {
log.trace("lookupOutboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress()); log.trace("lookupOutboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress());
printOutBoundConnections(); printOutBoundConnections();
return outBoundConnections.stream() Optional<OutboundConnection> outboundConnectionOptional;
synchronized (outBoundConnections) {
outboundConnectionOptional = outBoundConnections.stream()
.filter(connection -> connection.hasPeersNodeAddress() && .filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
} }
return outboundConnectionOptional;
}
private void printOutBoundConnections() { private void printOutBoundConnections() {
StringBuilder sb = new StringBuilder("outBoundConnections size()=") StringBuilder sb = new StringBuilder("outBoundConnections size()=")
@ -375,10 +398,14 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional<InboundConnection> lookupInBoundConnection(NodeAddress peersNodeAddress) { private Optional<InboundConnection> lookupInBoundConnection(NodeAddress peersNodeAddress) {
log.trace("lookupInboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress()); log.trace("lookupInboundConnection for peersNodeAddress={}", peersNodeAddress.getFullAddress());
printInboundConnections(); printInboundConnections();
return inBoundConnections.stream() Optional<InboundConnection> inboundConnectionOptional;
synchronized (inBoundConnections) {
inboundConnectionOptional = inBoundConnections.stream()
.filter(connection -> connection.hasPeersNodeAddress() && .filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny(); peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
} }
return inboundConnectionOptional;
}
private void printInboundConnections() { private void printInboundConnections() {
StringBuilder sb = new StringBuilder("inBoundConnections size()=") StringBuilder sb = new StringBuilder("inBoundConnections size()=")