Add timeout to socket creation, remove disconnect calls, check again for inbound connections after socket is created

This commit is contained in:
Manfred Karrer 2016-02-23 01:42:08 +01:00
parent 2775c0a55d
commit 5dd89391df
14 changed files with 130 additions and 67 deletions

View File

@ -308,22 +308,22 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onConnection(Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent()) {
connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null,
connection.getNodeAddressProperty().get());
connectionNodeAddressListener.changed(connection.peersNodeAddressProperty(), null,
connection.peersNodeAddressProperty().get());
} else {
connection.getNodeAddressProperty().addListener(connectionNodeAddressListener);
connection.peersNodeAddressProperty().addListener(connectionNodeAddressListener);
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
connection.peersNodeAddressProperty().removeListener(connectionNodeAddressListener);
// We removed the listener after a delay to be sure the connection has been removed
// from the networkNode already.
UserThread.runAfter(() ->
connectionNodeAddressListener.changed(connection.getNodeAddressProperty(), null,
connection.getNodeAddressProperty().get())
connectionNodeAddressListener.changed(connection.peersNodeAddressProperty(), null,
connection.peersNodeAddressProperty().get())
, 1);
}

View File

@ -8,6 +8,8 @@ import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import java.util.Arrays;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkNotNull;
public final class PrefixedSealedAndSignedMessage implements MailboxMessage, SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@ -19,6 +21,7 @@ public final class PrefixedSealedAndSignedMessage implements MailboxMessage, Sen
private final String uid = UUID.randomUUID().toString();
public PrefixedSealedAndSignedMessage(NodeAddress senderNodeAddress, SealedAndSigned sealedAndSigned, byte[] addressPrefixHash) {
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at PrefixedSealedAndSignedMessage");
this.senderNodeAddress = senderNodeAddress;
this.sealedAndSigned = sealedAndSigned;
this.addressPrefixHash = addressPrefixHash;

View File

@ -94,7 +94,7 @@ public class Connection implements MessageListener {
// use GZIPInputStream but problems with blocking
private final boolean useCompression = false;
private PeerType peerType;
private final ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>();
private final ObjectProperty<NodeAddress> peersNodeAddressProperty = new SimpleObjectProperty<>();
private final List<Tuple2<Long, Serializable>> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
@ -301,7 +301,7 @@ public class Connection implements MessageListener {
"\n############################################################\n");
}
nodeAddressProperty.set(peerNodeAddress);
peersNodeAddressProperty.set(peerNodeAddress);
}
@ -329,8 +329,8 @@ public class Connection implements MessageListener {
return peerType;
}
public ReadOnlyObjectProperty<NodeAddress> getNodeAddressProperty() {
return nodeAddressProperty;
public ReadOnlyObjectProperty<NodeAddress> peersNodeAddressProperty() {
return peersNodeAddressProperty;
}
public RuleViolation getRuleViolation() {
@ -700,12 +700,15 @@ public class Connection implements MessageListener {
if (message instanceof SendersNodeAddressMessage) {
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress();
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent())
if (peersNodeAddressOptional.isPresent()) {
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
"senderNodeAddress not matching connections peer address.\nmessage=" + message);
else
"senderNodeAddress not matching connections peer address.\n\t" +
"message=" + message);
} else {
connection.setPeersNodeAddress(senderNodeAddress);
}
}
if (message instanceof PrefixedSealedAndSignedMessage)
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);

View File

@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@ -27,10 +28,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
// Run in UserThread
public abstract class NetworkNode implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
private static final int CREATE_SOCKET_TIMEOUT_MILLIS = 5000;
final int servicePort;
private final CopyOnWriteArraySet<Connection> inBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<InboundConnection> inBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
@ -39,7 +41,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private volatile boolean shutDownInProgress;
// accessed from different threads
private final CopyOnWriteArraySet<Connection> outBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<OutboundConnection> outBoundConnections = new CopyOnWriteArraySet<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -60,28 +62,14 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Log.traceCall("peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + StringUtils.abbreviate(message.toString(), 100));
checkNotNull(peersNodeAddress, "peerAddress must not be null");
Optional<Connection> outboundConnectionOptional = lookupOutboundConnection(peersNodeAddress);
Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null;
if (connection != null)
log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid());
if (connection != null && connection.isStopped()) {
log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
outBoundConnections.remove(connection);
connection = null;
}
if (connection == null) {
Optional<Connection> inboundConnectionOptional = lookupInboundConnection(peersNodeAddress);
if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get();
if (connection != null)
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
}
Connection connection = getOutboundConnection(peersNodeAddress);
if (connection == null)
connection = getInboundConnection(peersNodeAddress);
if (connection != null) {
return sendMessage(connection, message);
} else {
log.trace("We have not found any connection for peerAddress {}.\n\t" +
log.info("We have not found any connection for peerAddress {}.\n\t" +
"We will create a new outbound connection.", peersNodeAddress);
final SettableFuture<Connection> resultFuture = SettableFuture.create();
@ -90,21 +78,47 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
OutboundConnection outboundConnection = null;
try {
// can take a while when using tor
long startTs = System.currentTimeMillis();
log.info("Start create socket to peersNodeAddress {}", peersNodeAddress.getFullAddress());
Socket socket = createSocket(peersNodeAddress);
outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress);
outBoundConnections.add(outboundConnection);
long duration = System.currentTimeMillis() - startTs;
log.info("Socket creation to peersNodeAddress {} took {} ms", peersNodeAddress.getFullAddress(),
duration);
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + message
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
if (duration > CREATE_SOCKET_TIMEOUT_MILLIS)
throw new TimeoutException("A timeout occurred when creating a socket.");
// can take a while when using tor
outboundConnection.sendMessage(message);
return outboundConnection;
// Tor needs sometimes quite long to create a connection. To avoid that we get too many double
// sided connections we check again if we still don't have an incoming connection.
Connection inboundConnection = getInboundConnection(peersNodeAddress);
if (inboundConnection != null) {
log.info("We found in the meantime an inbound connection for peersNodeAddress {}, " +
"so we use that for sending the message.\n" +
"That happens when Tor needs long for creating a new outbound connection.",
peersNodeAddress.getFullAddress());
try {
socket.close();
} catch (Throwable throwable) {
log.error("Error at closing socket " + throwable);
}
inboundConnection.sendMessage(message);
return inboundConnection;
} else {
outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress);
outBoundConnections.add(outboundConnection);
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"NetworkNode created new outbound connection:"
+ "\nmyNodeAddress=" + getNodeAddress()
+ "\npeersNodeAddress=" + peersNodeAddress
+ "\nuid=" + outboundConnection.getUid()
+ "\nmessage=" + message
+ "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n");
// can take a while when using tor
outboundConnection.sendMessage(message);
return outboundConnection;
}
} catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException)) {
log.error("Executing task failed. " + throwable.getMessage());
@ -127,6 +141,42 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
}
@Nullable
private InboundConnection getInboundConnection(@NotNull NodeAddress peersNodeAddress) {
Optional<InboundConnection> inboundConnectionOptional = lookupInboundConnection(peersNodeAddress);
if (inboundConnectionOptional.isPresent()) {
InboundConnection connection = inboundConnectionOptional.get();
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
if (connection.isStopped()) {
log.warn("We have a connection which is already stopped in inBoundConnections. Connection.uid=" + connection.getUid());
inBoundConnections.remove(connection);
return null;
} else {
return connection;
}
} else {
return null;
}
}
@Nullable
private OutboundConnection getOutboundConnection(@NotNull NodeAddress peersNodeAddress) {
Optional<OutboundConnection> outboundConnectionOptional = lookupOutboundConnection(peersNodeAddress);
if (outboundConnectionOptional.isPresent()) {
OutboundConnection connection = outboundConnectionOptional.get();
log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid());
if (connection.isStopped()) {
log.warn("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
outBoundConnections.remove(connection);
return null;
} else {
return connection;
}
} else {
return null;
}
}
public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
Log.traceCall("\n\tmessage=" + StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection);
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
@ -211,7 +261,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
outBoundConnections.remove(connection);
inBoundConnections.remove(connection);
// inbound connections are removed in the listener of the server
connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection));
}
@ -272,10 +322,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
void startServer(ServerSocket serverSocket) {
ConnectionListener startServerConnectionListener = new ConnectionListener() {
ConnectionListener connectionListener = new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
inBoundConnections.add(connection);
inBoundConnections.add((InboundConnection) connection);
NetworkNode.this.onConnection(connection);
}
@ -292,28 +342,30 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
};
server = new Server(serverSocket,
NetworkNode.this,
startServerConnectionListener);
connectionListener);
executorService.submit(server);
}
private Optional<Connection> lookupOutboundConnection(NodeAddress peersNodeAddress) {
private Optional<OutboundConnection> lookupOutboundConnection(NodeAddress peersNodeAddress) {
StringBuilder sb = new StringBuilder("Lookup for peersNodeAddress=");
sb.append(peersNodeAddress.toString()).append("/ outBoundConnections.size()=")
.append(outBoundConnections.size()).append("/\n\toutBoundConnections=");
outBoundConnections.stream().forEach(e -> sb.append(e).append("\n\t"));
log.debug(sb.toString());
return outBoundConnections.stream()
.filter(e -> e.getPeersNodeAddressOptional().isPresent() && peersNodeAddress.equals(e.getPeersNodeAddressOptional().get())).findAny();
.filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
}
private Optional<Connection> lookupInboundConnection(NodeAddress peersNodeAddress) {
private Optional<InboundConnection> lookupInboundConnection(NodeAddress peersNodeAddress) {
StringBuilder sb = new StringBuilder("Lookup for peersNodeAddress=");
sb.append(peersNodeAddress.toString()).append("/ inBoundConnections.size()=")
.append(inBoundConnections.size()).append("/\n\tinBoundConnections=");
inBoundConnections.stream().forEach(e -> sb.append(e).append("\n\t"));
log.debug(sb.toString());
return inBoundConnections.stream()
.filter(e -> e.getPeersNodeAddressOptional().isPresent() && peersNodeAddress.equals(e.getPeersNodeAddressOptional().get())).findAny();
.filter(connection -> connection.hasPeersNodeAddress() &&
peersNodeAddress.equals(connection.getPeersNodeAddressOptional().get())).findAny();
}
abstract protected Socket createSocket(NodeAddress peersNodeAddress) throws IOException;

View File

@ -171,7 +171,7 @@ public class PeerManager implements ConnectionListener {
@Override
public void onConnection(Connection connection) {
connection.getNodeAddressProperty().addListener(connectionNodeAddressListener);
connection.peersNodeAddressProperty().addListener(connectionNodeAddressListener);
if (isSeedNode(connection))
connection.setPeerType(Connection.PeerType.SEED_NODE);
@ -185,7 +185,7 @@ public class PeerManager implements ConnectionListener {
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
connection.peersNodeAddressProperty().removeListener(connectionNodeAddressListener);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();

View File

@ -106,7 +106,7 @@ public class GetDataRequestHandler {
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
log.info(errorMessage);
peerManager.shutDownConnection(connection, closeConnectionReason);
//peerManager.shutDownConnection(connection, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
}

View File

@ -177,7 +177,7 @@ public class RequestDataHandler implements MessageListener {
private void handleFault(String errorMessage, NodeAddress nodeAddress, CloseConnectionReason closeConnectionReason) {
cleanup();
peerManager.shutDownConnection(nodeAddress, closeConnectionReason);
//peerManager.shutDownConnection(nodeAddress, closeConnectionReason);
peerManager.handleConnectionFault(nodeAddress);
listener.onFault(errorMessage, null);
}

View File

@ -4,6 +4,8 @@ import io.bitsquare.app.Version;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import static com.google.common.base.Preconditions.checkNotNull;
public final class GetUpdatedDataRequest implements SendersNodeAddressMessage, GetDataRequest {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@ -13,6 +15,7 @@ public final class GetUpdatedDataRequest implements SendersNodeAddressMessage, G
private final int nonce;
public GetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce) {
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetUpdatedDataRequest");
this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce;
}

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
@ -91,7 +90,7 @@ class KeepAliveHandler implements MessageListener {
".\n\tException=" + throwable.getMessage();
log.info(errorMessage);
cleanup();
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
//peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
peerManager.handleConnectionFault(connection);
listener.onFault(errorMessage);
} else {

View File

@ -96,7 +96,7 @@ public class KeepAliveManager implements MessageListener, ConnectionListener, Pe
"Exception: " + throwable.getMessage();
log.info(errorMessage);
peerManager.handleConnectionFault(connection);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
// peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.warn("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}

View File

@ -122,7 +122,7 @@ class GetPeersRequestHandler {
private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
cleanup();
peerManager.shutDownConnection(connection, closeConnectionReason);
//peerManager.shutDownConnection(connection, closeConnectionReason);
listener.onFault(errorMessage, connection);
}

View File

@ -170,10 +170,10 @@ class PeerExchangeHandler implements MessageListener {
private void handleFault(String errorMessage, CloseConnectionReason sendMsgFailure, NodeAddress nodeAddress) {
Log.traceCall();
cleanup();
if (connection == null)
/* if (connection == null)
peerManager.shutDownConnection(nodeAddress, sendMsgFailure);
else
peerManager.shutDownConnection(connection, sendMsgFailure);
peerManager.shutDownConnection(connection, sendMsgFailure);*/
peerManager.handleConnectionFault(nodeAddress, connection);
listener.onFault(errorMessage, connection);

View File

@ -229,11 +229,11 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
handlerMap.put(nodeAddress, peerExchangeHandler);
peerExchangeHandler.sendGetPeersRequest(nodeAddress);
} else {
log.warn("We have started already a peerExchangeHandler. " +
log.trace("We have started already a peerExchangeHandler. " +
"We ignore that call. nodeAddress=" + nodeAddress);
}
} else {
log.warn("We have stopped already. We ignore that requestReportedPeers call.");
log.trace("We have stopped already. We ignore that requestReportedPeers call.");
}
}
@ -281,7 +281,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
log.info("We have already sufficient connections.");
}
} else {
log.warn("We have stopped already. We ignore that requestWithAvailablePeers call.");
log.trace("We have stopped already. We ignore that requestWithAvailablePeers call.");
}
}
@ -359,7 +359,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener,
handlerMap.remove(nodeAddress);
}
} else {
log.warn("closeHandler: nodeAddress not set in connection " + connection);
log.trace("closeHandler: nodeAddress not set in connection " + connection);
}
}

View File

@ -7,6 +7,8 @@ import io.bitsquare.p2p.peers.peerexchange.ReportedPeer;
import java.util.HashSet;
import static com.google.common.base.Preconditions.checkNotNull;
public final class GetPeersRequest extends PeerExchangeMessage implements SendersNodeAddressMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@ -16,6 +18,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<ReportedPeer> reportedPeers) {
checkNotNull(senderNodeAddress, "senderNodeAddress must not be null at GetPeersRequest");
this.senderNodeAddress = senderNodeAddress;
this.nonce = nonce;
this.reportedPeers = reportedPeers;