Refactor authentication

This commit is contained in:
Manfred Karrer 2015-12-24 02:29:46 +01:00
parent e3cbaeef7e
commit cd631eb53b
23 changed files with 379 additions and 379 deletions

View file

@ -134,14 +134,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
peerGroup = new PeerGroup(networkNode); peerGroup = new PeerGroup(networkNode);
peerGroup.addAuthenticationListener(this); peerGroup.addAuthenticationListener(this);
if (useLocalhost) if (useLocalhost)
PeerGroup.setSimulateAuthTorNode(200); PeerGroup.setSimulateAuthTorNode(100);
// P2P network data storage // P2P network data storage
dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir); dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir);
dataStorage.addHashMapChangedListener(this); dataStorage.addHashMapChangedListener(this);
// Request initial data manager // Request initial data manager
requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() { requestDataManager = new RequestDataManager(networkNode, dataStorage, peerGroup, new RequestDataManager.Listener() {
@Override @Override
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
@ -286,9 +286,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall(); Log.traceCall();
if (connection.isAuthenticated()) connection.getPeerAddress().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
authenticatedPeerAddresses.remove(connection.getPeerAddress());
numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
} }
@ -302,10 +300,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { public void onPeerAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall(); Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress); authenticatedPeerAddresses.add(peerAddress);
if (!firstPeerAuthenticated.get()) { if (!firstPeerAuthenticated.get()) {
@ -315,8 +311,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
numAuthenticatedPeers.set(authenticatedPeerAddresses.size()); numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation // MessageListener implementation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -339,8 +335,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG); connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey); log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
decryptedMailListeners.stream().forEach( connection.getPeerAddress().ifPresent(peerAddresses ->
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())); decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, peerAddresses)));
} else { } else {
log.info("Wrong receiverAddressMaskHash. The message is not intended for us."); log.info("Wrong receiverAddressMaskHash. The message is not intended for us.");
} }

View file

@ -19,9 +19,12 @@ import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.*; import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkNotNull;
/** /**
* Connection is created by the server thread or by sendMessage from NetworkNode. * Connection is created by the server thread or by sendMessage from NetworkNode.
* All handlers are called on User thread. * All handlers are called on User thread.
@ -31,7 +34,7 @@ public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
//timeout on blocking Socket operations like ServerSocket.accept() or SocketInputStream.read() //timeout on blocking Socket operations like ServerSocket.accept() or SocketInputStream.read()
private static final int SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 min. private static final int SOCKET_TIMEOUT = 10 * 60 * 1000; // 10 min.
private ConnectionPriority connectionPriority; private ConnectionPriority connectionPriority;
public static int getMaxMsgSize() { public static int getMaxMsgSize() {
@ -41,6 +44,7 @@ public class Connection implements MessageListener {
private final Socket socket; private final Socket socket;
private final MessageListener messageListener; private final MessageListener messageListener;
private final ConnectionListener connectionListener; private final ConnectionListener connectionListener;
private final String portInfo; private final String portInfo;
private final String uid = UUID.randomUUID().toString(); private final String uid = UUID.randomUUID().toString();
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
@ -52,8 +56,7 @@ public class Connection implements MessageListener {
private ObjectOutputStream objectOutputStream; private ObjectOutputStream objectOutputStream;
// mutable data, set from other threads but not changed internally. // mutable data, set from other threads but not changed internally.
@Nullable private Optional<Address> peerAddressOptional = Optional.empty();
private Address peerAddress;
private volatile boolean isAuthenticated; private volatile boolean isAuthenticated;
private volatile boolean stopped; private volatile boolean stopped;
@ -116,9 +119,8 @@ public class Connection implements MessageListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Called form UserThread // Called form UserThread
public void setAuthenticated(Address peerAddress) { public void setAuthenticated() {
Log.traceCall(); Log.traceCall();
this.peerAddress = peerAddress;
isAuthenticated = true; isAuthenticated = true;
} }
@ -131,9 +133,10 @@ public class Connection implements MessageListener {
Log.traceCall(); Log.traceCall();
if (!stopped) { if (!stopped) {
try { try {
String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null";
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" + log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}" "Write object to outputStream to peer: {} (uid={})\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", getPeerAddress(), uid, message); + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", peerAddress, uid, message);
Object objectToWrite; Object objectToWrite;
if (useCompression) { if (useCompression) {
@ -167,9 +170,10 @@ public class Connection implements MessageListener {
sharedSpace.reportIllegalRequest(illegalRequest); sharedSpace.reportIllegalRequest(illegalRequest);
} }
public synchronized void setPeerAddress(@Nullable Address peerAddress) { public synchronized void setPeerAddress(Address peerAddress) {
Log.traceCall(); Log.traceCall();
this.peerAddress = peerAddress; checkNotNull(peerAddress, "peerAddress must not be null");
peerAddressOptional = Optional.of(peerAddress);
} }
@ -189,8 +193,12 @@ public class Connection implements MessageListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Nullable @Nullable
public synchronized Address getPeerAddress() { public synchronized Address getPeerAddress1() {
return peerAddress; return peerAddressOptional.isPresent() ? peerAddressOptional.get() : null;
}
public synchronized Optional<Address> getPeerAddress() {
return peerAddressOptional;
} }
public Date getLastActivityDate() { public Date getLastActivityDate() {
@ -213,6 +221,7 @@ public class Connection implements MessageListener {
return connectionPriority; return connectionPriority;
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown // ShutDown
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -232,6 +241,7 @@ public class Connection implements MessageListener {
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(this.toString()); Log.traceCall(this.toString());
if (!stopped) { if (!stopped) {
String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null";
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:" "ShutDown connection:"
+ "\npeerAddress=" + peerAddress + "\npeerAddress=" + peerAddress
@ -249,31 +259,32 @@ public class Connection implements MessageListener {
Log.traceCall("sendCloseConnectionMessage"); Log.traceCall("sendCloseConnectionMessage");
try { try {
sendMessage(new CloseConnectionMessage()); sendMessage(new CloseConnectionMessage());
stopped = true; setStopFlags();
sharedSpace.stop();
if (inputHandler != null)
inputHandler.stop();
// TODO increase delay Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) { } catch (Throwable t) {
log.error(t.getMessage()); log.error(t.getMessage());
t.printStackTrace(); t.printStackTrace();
} finally { } finally {
UserThread.execute(() -> continueShutDown(shutDownCompleteHandler)); UserThread.execute(() -> doShutDown(shutDownCompleteHandler));
} }
}).start(); }).start();
} else { } else {
stopped = true; setStopFlags();
sharedSpace.stop(); doShutDown(shutDownCompleteHandler);
if (inputHandler != null)
inputHandler.stop();
continueShutDown(shutDownCompleteHandler);
} }
} }
} }
private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) { private void setStopFlags() {
stopped = true;
sharedSpace.stop();
if (inputHandler != null)
inputHandler.stop();
isAuthenticated = false;
}
private void doShutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(); Log.traceCall();
ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason(); ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason();
if (shutDownReason == null) if (shutDownReason == null)
@ -309,7 +320,7 @@ public class Connection implements MessageListener {
if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false; if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false;
if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false; if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false;
return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null); return peerAddressOptional != null ? peerAddressOptional.equals(that.peerAddressOptional) : that.peerAddressOptional == null;
} }
@ -317,7 +328,7 @@ public class Connection implements MessageListener {
public int hashCode() { public int hashCode() {
int result = portInfo != null ? portInfo.hashCode() : 0; int result = portInfo != null ? portInfo.hashCode() : 0;
result = 31 * result + (uid != null ? uid.hashCode() : 0); result = 31 * result + (uid != null ? uid.hashCode() : 0);
result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0); result = 31 * result + (peerAddressOptional != null ? peerAddressOptional.hashCode() : 0);
return result; return result;
} }
@ -327,7 +338,7 @@ public class Connection implements MessageListener {
"portInfo=" + portInfo + "portInfo=" + portInfo +
", uid='" + uid + '\'' + ", uid='" + uid + '\'' +
", sharedSpace=" + sharedSpace.toString() + ", sharedSpace=" + sharedSpace.toString() +
", peerAddress=" + peerAddress + ", peerAddress=" + peerAddressOptional +
", isAuthenticated=" + isAuthenticated + ", isAuthenticated=" + isAuthenticated +
", stopped=" + stopped + ", stopped=" + stopped +
", stopped=" + stopped + ", stopped=" + stopped +
@ -398,6 +409,7 @@ public class Connection implements MessageListener {
public void handleConnectionException(Exception e) { public void handleConnectionException(Exception e) {
Log.traceCall(e.toString()); Log.traceCall(e.toString());
log.debug("connection=" + this);
if (e instanceof SocketException) { if (e instanceof SocketException) {
if (socket.isClosed()) if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@ -405,11 +417,12 @@ public class Connection implements MessageListener {
shutDownReason = ConnectionListener.Reason.RESET; shutDownReason = ConnectionListener.Reason.RESET;
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) { } else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
shutDownReason = ConnectionListener.Reason.TIMEOUT; shutDownReason = ConnectionListener.Reason.TIMEOUT;
log.warn("TimeoutException at connection with port " + socket.getLocalPort());
} else if (e instanceof EOFException) { } else if (e instanceof EOFException) {
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED; shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
} else { } else {
shutDownReason = ConnectionListener.Reason.UNKNOWN; shutDownReason = ConnectionListener.Reason.UNKNOWN;
log.info("Exception at connection with port " + socket.getLocalPort()); log.warn("Exception at connection with port " + socket.getLocalPort());
e.printStackTrace(); e.printStackTrace();
} }

View file

@ -27,8 +27,8 @@ import java.util.function.Consumer;
public class LocalhostNetworkNode extends NetworkNode { public class LocalhostNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class); private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
private static volatile int simulateTorDelayTorNode = 600; private static volatile int simulateTorDelayTorNode = 100;
private static volatile int simulateTorDelayHiddenService = 3000; private static volatile int simulateTorDelayHiddenService = 500;
private Address address; private Address address;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) { public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {

View file

@ -15,7 +15,9 @@ import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.*; import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -62,7 +64,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
abstract public void start(@Nullable SetupListener setupListener); abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull Address peerAddress, Message message) { public SettableFuture<Connection> sendMessage(@NotNull Address peerAddress, Message message) {
Log.traceCall("message: " + message + " to peerAddress: " + peerAddress); Log.traceCall("peerAddress: " + peerAddress + " / message: " + message);
checkNotNull(peerAddress, "peerAddress must not be null"); checkNotNull(peerAddress, "peerAddress must not be null");
Optional<Connection> outboundConnectionOptional = lookupOutboundConnection(peerAddress); Optional<Connection> outboundConnectionOptional = lookupOutboundConnection(peerAddress);
@ -86,8 +88,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
if (connection != null) { if (connection != null) {
return sendMessage(connection, message); return sendMessage(connection, message);
} else { } else {
log.trace("We have not found any connection for that peerAddress. " + log.trace("We have not found any connection for peerAddress {}. " +
"We will create a new outbound connection."); "We will create a new outbound connection.", peerAddress);
final SettableFuture<Connection> resultFuture = SettableFuture.create(); final SettableFuture<Connection> resultFuture = SettableFuture.create();
final boolean[] timeoutOccurred = new boolean[1]; final boolean[] timeoutOccurred = new boolean[1];
@ -96,7 +98,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress); Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress);
try { try {
// can take a while when using tor // can take a while when using tor
Socket socket = createSocket(peerAddress); Socket socket = createSocket(peerAddress);
if (timeoutOccurred[0]) if (timeoutOccurred[0])
throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress); throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress);
@ -114,7 +116,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// can take a while when using tor // can take a while when using tor
newConnection.sendMessage(message); newConnection.sendMessage(message);
return newConnection; return newConnection;
} catch (Throwable throwable) { } catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) {
throwable.printStackTrace(); throwable.printStackTrace();
@ -124,7 +126,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
} }
}); });
Timer timer = new Timer(); //TODO does not close the connection yet. not clear if socket timeout is enough.
/*Timer timer = new Timer();
timer.schedule(new TimerTask() { timer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
@ -135,19 +138,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.info(message); log.info(message);
UserThread.execute(() -> resultFuture.setException(new TimeoutException(message))); UserThread.execute(() -> resultFuture.setException(new TimeoutException(message)));
} }
}, CREATE_SOCKET_TIMEOUT); }, CREATE_SOCKET_TIMEOUT);*/
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
UserThread.execute(() -> { UserThread.execute(() -> {
timer.cancel(); //timer.cancel();
resultFuture.set(connection); resultFuture.set(connection);
}); });
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> { UserThread.execute(() -> {
timer.cancel(); //timer.cancel();
resultFuture.setException(throwable); resultFuture.setException(throwable);
}); });
} }
@ -158,7 +161,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
} }
public SettableFuture<Connection> sendMessage(Connection connection, Message message) { public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
Log.traceCall(); Log.traceCall("message: " + message + " to connection: " + connection);
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> { ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid()); Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
@ -223,15 +226,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override @Override
public void onConnection(Connection connection) { public void onConnection(Connection connection) {
Log.traceCall("NetworkNode connection=" + connection); Log.traceCall("connection=" + connection);
connectionListeners.stream().forEach(e -> e.onConnection(connection)); connectionListeners.stream().forEach(e -> e.onConnection(connection));
} }
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall(); Log.traceCall("connection = " + connection);
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress);
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection));
@ -311,10 +312,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall(); Log.traceCall("onDisconnect at incoming connection = " + connection);
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect at incoming connection to peerAddress (or connection) "
+ ((peerAddress == null) ? connection : peerAddress));
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection); NetworkNode.this.onDisconnect(reason, connection);
} }
@ -334,13 +332,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional<Connection> lookupOutboundConnection(Address peerAddress) { private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections); Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream() return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
} }
private Optional<Connection> lookupInboundConnection(Address peerAddress) { private Optional<Connection> lookupInboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections); Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream() return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny(); .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
} }
abstract protected Socket createSocket(Address peerAddress) throws IOException; abstract protected Socket createSocket(Address peerAddress) throws IOException;

View file

@ -11,7 +11,10 @@ import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionPriority; import io.bitsquare.p2p.network.ConnectionPriority;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.auth.*; import io.bitsquare.p2p.peers.messages.auth.AuthenticationChallenge;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationMessage;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationResponse;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,14 +24,13 @@ import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
// authentication protocol: // Authentication protocol:
// node2 -> node1 AuthenticationRequest // client: send AuthenticationRequest to seedNode
// node1: close connection // seedNode: close connection
// node1 -> node2 AuthenticationResponse on new connection // seedNode: send AuthenticationChallenge to client on a new connection to test if address is correct
// node2: authentication to node1 done if nonce ok // client: authentication to seedNode done if nonce verification is ok
// node2 -> node1 GetPeersAuthRequest // client: AuthenticationResponse to seedNode
// node1: authentication to node2 done if nonce ok // seedNode: authentication to client done if nonce verification is ok
// node1 -> node2 GetPeersAuthResponse
public class AuthenticationHandshake implements MessageListener { public class AuthenticationHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class); private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class);
@ -76,28 +78,33 @@ public class AuthenticationHandshake implements MessageListener {
if (message instanceof AuthenticationMessage) { if (message instanceof AuthenticationMessage) {
// We are listening on all connections, so we need to filter out only our peer // We are listening on all connections, so we need to filter out only our peer
if (((AuthenticationMessage) message).address.equals(peerAddress)) { if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
if (message instanceof AuthenticationResponse) { if (message instanceof AuthenticationChallenge) {
// Requesting peer // Requesting peer
AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
// We need to set the address to the connection, otherwise we will not find the connection when sending
// the next message and we would create a new outbound connection instead using the inbound.
connection.setPeerAddress(authenticationChallenge.senderAddress);
// We use the active connectionType if we started the authentication request to another peer // We use the active connectionType if we started the authentication request to another peer
// That is used for protecting eclipse attacks
connection.setConnectionPriority(ConnectionPriority.ACTIVE); connection.setConnectionPriority(ConnectionPriority.ACTIVE);
AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
connection.setPeerAddress(peerAddress);
log.trace("Received authenticationResponse from " + peerAddress); log.trace("Received authenticationResponse from " + peerAddress);
boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; boolean verified = nonce != 0 && nonce == authenticationChallenge.requesterNonce;
if (verified) { if (verified) {
GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress, AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress,
authenticationResponse.responderNonce, authenticationChallenge.responderNonce,
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())); new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthRequest); SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse);
log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress); log.trace("Sent GetPeersAuthRequest {} to {}", authenticationResponse, peerAddress);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("Successfully sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress); log.trace("Successfully sent GetPeersAuthRequest to {}", peerAddress);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
} }
@Override @Override
@ -107,63 +114,27 @@ public class AuthenticationHandshake implements MessageListener {
} }
}); });
// We could set already the authenticated flag here already, but as we need the reported peers we need // now we add the reported peers to our list
// to wait for the GetPeersAuthResponse before we are completed. peerGroup.addToReportedPeers(authenticationChallenge.reportedPeers, connection);
} else { } else {
log.warn("verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonce=" + nonce); log.warn("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonce=" + nonce);
failed(new Exception("Verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonceMap=" + nonce)); failed(new Exception("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonceMap=" + nonce));
} }
} else if (message instanceof GetPeersAuthRequest) { } else if (message instanceof AuthenticationResponse) {
// Responding peer // Responding peer
GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
log.trace("GetPeersAuthRequest from " + peerAddress + " at " + myAddress); log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce; boolean verified = nonce != 0 && nonce == authenticationResponse.responderNonce;
if (verified) { if (verified) {
// we create the msg with our already collected peer addresses (before adding the new ones) peerGroup.addToReportedPeers(authenticationResponse.reportedPeers, connection);
GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress, log.info("AuthenticationComplete: Peer with address " + peerAddress
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers())); + " authenticated (" + connection.getUid() + "). Took "
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, getPeersAuthResponse); + (System.currentTimeMillis() - startAuthTs) + " ms.");
log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress); completed(connection);
// now we add the reported peers to our own set
HashSet<ReportedPeer> reportedPeers = getPeersAuthRequest.reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
peerGroup.addToReportedPeers(reportedPeers, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Successfully sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersAuthResponse sending failed " + throwable.getMessage());
failed(throwable);
}
});
} else { } else {
log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce); log.warn("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce);
failed(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce)); failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce));
} }
} else if (message instanceof GetPeersAuthResponse) {
// Requesting peer
GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
log.trace("GetPeersAuthResponse from " + peerAddress + " at " + myAddress);
HashSet<ReportedPeer> reportedPeers = getPeersAuthResponse.reportedPeers;
log.trace("Received reported peers: " + reportedPeers);
peerGroup.addToReportedPeers(reportedPeers, connection);
log.info("AuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getUid() + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms.");
completed(connection);
} }
} }
} }
@ -190,7 +161,6 @@ public class AuthenticationHandshake implements MessageListener {
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("send AuthenticationRequest to " + peerAddress + " succeeded."); log.trace("send AuthenticationRequest to " + peerAddress + " succeeded.");
connection.setPeerAddress(peerAddress);
// We protect that connection from getting closed by maintenance cleanup... // We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
} }
@ -222,40 +192,42 @@ public class AuthenticationHandshake implements MessageListener {
resultFutureOptional = Optional.of(SettableFuture.create()); resultFutureOptional = Optional.of(SettableFuture.create());
log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " + log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress); "connection with his reported address to verify if his address is correct.", peerAddress);
connection.shutDown(() -> { connection.shutDown(() -> {
UserThread.runAfter(() -> { UserThread.runAfter(() -> {
if (!stopped) { if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections) // inconsistent state
log.trace("processAuthenticationMessage: connection.shutDown complete. AuthenticationRequest from " + peerAddress + " at " + myAddress); log.trace("respondToAuthenticationRequest: connection.shutDown complete. peerAddress=" + peerAddress + " / myAddress=" + myAddress);
AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress, // we send additionally the reported and authenticated peers to save one message in the protocol.
AuthenticationChallenge authenticationChallenge = new AuthenticationChallenge(myAddress,
authenticationRequest.requesterNonce, authenticationRequest.requesterNonce,
getAndSetNonce()); getAndSetNonce(),
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationResponse); new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, authenticationChallenge);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("onSuccess sending AuthenticationResponse"); log.trace("AuthenticationResponse successfully sent");
connection.setPeerAddress(peerAddress); // We use passive connectionType for connections created from received authentication
// We use passive connectionType for connections created from received authentication requests from other peers // requests from other peers
// That is used for protecting eclipse attacks
connection.setConnectionPriority(ConnectionPriority.PASSIVE); connection.setConnectionPriority(ConnectionPriority.PASSIVE);
} }
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending AuthenticationResponse."); log.warn("onFailure sending AuthenticationResponse. " + throwable.getMessage());
failed(throwable); failed(throwable);
} }
}); });
} else {
log.warn("AuthenticationHandshake already shut down before we could sent AuthenticationResponse. That might happen in rare cases.");
} }
}, 200, TimeUnit.MILLISECONDS); }, 1000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer
}); });
return resultFutureOptional.get(); return resultFutureOptional.get();
} }
@ -284,7 +256,7 @@ public class AuthenticationHandshake implements MessageListener {
} }
private void failed(@NotNull Throwable throwable) { private void failed(@NotNull Throwable throwable) {
Log.traceCall(); Log.traceCall(throwable.toString());
shutDown(); shutDown();
if (resultFutureOptional.isPresent()) if (resultFutureOptional.isPresent())
resultFutureOptional.get().setException(throwable); resultFutureOptional.get().setException(throwable);

View file

@ -4,5 +4,5 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
public interface AuthenticationListener { public interface AuthenticationListener {
void onPeerAddressAuthenticated(Address peerAddress, Connection connection); void onPeerAuthenticated(Address peerAddress, Connection connection);
} }

View file

@ -56,7 +56,6 @@ public class MaintenanceManager implements MessageListener {
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
if (message instanceof MaintenanceMessage) { if (message instanceof MaintenanceMessage) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) { if (message instanceof PingMessage) {
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce)); SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@ -68,19 +67,19 @@ public class MaintenanceManager implements MessageListener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage()); log.info("PongMessage sending failed " + throwable.getMessage());
peerGroup.removePeer(connection.getPeerAddress()); connection.getPeerAddress().ifPresent(peerAddress -> peerGroup.removePeer(peerAddress));
} }
}); });
} else if (message instanceof PongMessage) { } else if (message instanceof PongMessage) {
if (connection.getPeerAddress() != null) { connection.getPeerAddress().ifPresent(peerAddress -> {
Peer peer = peerGroup.getAuthenticatedPeers().get(connection.getPeerAddress()); Peer peer = peerGroup.getAuthenticatedPeers().get(peerAddress);
if (peer != null) { if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) { if (((PongMessage) message).nonce != peer.getPingNonce()) {
log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + connection.getPeerAddress()); log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + peerAddress);
peerGroup.removePeer(peer.address); peerGroup.removePeer(peer.address);
} }
} }
} });
} }
} }
} }
@ -93,7 +92,7 @@ public class MaintenanceManager implements MessageListener {
sendPingTimer = UserThread.runAfterRandomDelay(() -> { sendPingTimer = UserThread.runAfterRandomDelay(() -> {
pingPeers(); pingPeers();
startMaintenanceTimer(); startMaintenanceTimer();
}, 5, 10, TimeUnit.MINUTES); }, 5, 7, TimeUnit.MINUTES);
} }
@ -117,7 +116,7 @@ public class MaintenanceManager implements MessageListener {
peerGroup.removePeer(e.address); peerGroup.removePeer(e.address);
} }
}); });
}, 1, 10)); }, 2, 4, TimeUnit.SECONDS));
} }
} }
} }

View file

@ -14,9 +14,9 @@ public class Peer {
public final Address address; public final Address address;
private final long pingNonce; private final long pingNonce;
public Peer(Connection connection) { public Peer(Connection connection, Address address) {
this.connection = connection; this.connection = connection;
this.address = connection.getPeerAddress(); this.address = address;
pingNonce = new Random().nextLong(); pingNonce = new Random().nextLong();
} }

View file

@ -9,9 +9,9 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersRequest; import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersResponse; import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
import io.bitsquare.p2p.peers.messages.peerexchange.PeerExchangeMessage; import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,7 +51,6 @@ public class PeerExchangeManager implements MessageListener {
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
if (message instanceof PeerExchangeMessage) { if (message instanceof PeerExchangeMessage) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof GetPeersRequest) { if (message instanceof GetPeersRequest) {
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message; GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
HashSet<ReportedPeer> reportedPeers = getPeersRequestMessage.reportedPeers; HashSet<ReportedPeer> reportedPeers = getPeersRequestMessage.reportedPeers;
@ -68,7 +67,7 @@ public class PeerExchangeManager implements MessageListener {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersResponse sending failed " + throwable.getMessage()); log.info("GetPeersResponse sending failed " + throwable.getMessage());
peerGroup.removePeer(getPeersRequestMessage.address); peerGroup.removePeer(getPeersRequestMessage.senderAddress);
} }
}); });
@ -90,7 +89,7 @@ public class PeerExchangeManager implements MessageListener {
getPeersTimer = UserThread.runAfterRandomDelay(() -> { getPeersTimer = UserThread.runAfterRandomDelay(() -> {
trySendGetPeersRequest(); trySendGetPeersRequest();
startGetPeersTimer(); startGetPeersTimer();
}, 1, 2, TimeUnit.MINUTES); }, 2, 4, TimeUnit.MINUTES);
} }
private void trySendGetPeersRequest() { private void trySendGetPeersRequest() {
@ -113,7 +112,7 @@ public class PeerExchangeManager implements MessageListener {
peerGroup.removePeer(e.address); peerGroup.removePeer(e.address);
} }
}); });
}, 5, 10)); }, 3, 5, TimeUnit.SECONDS));
} }
} }

View file

@ -47,7 +47,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
setMaxConnectionsLowPriority(8); setMaxConnectionsLowPriority(8);
} }
static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000; static final int INACTIVITY_PERIOD_BEFORE_PING = 5 * 60 * 1000;
private static final int MAX_REPORTED_PEERS = 1000; private static final int MAX_REPORTED_PEERS = 1000;
private final NetworkNode networkNode; private final NetworkNode networkNode;
@ -95,7 +95,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
@Override @Override
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason); log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
removePeer(connection.getPeerAddress()); connection.getPeerAddress().ifPresent(peerAddress -> removePeer(peerAddress));
} }
@Override @Override
@ -127,23 +127,30 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedPeers.values().stream() authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender)) .filter(e -> !e.address.equals(sender))
.forEach(peer -> UserThread.runAfterRandomDelay(() -> { .forEach(peer -> UserThread.runAfterRandomDelay(() -> {
final Address address = peer.address; // as we use a delay we need to check again if our peer is still in the authenticated list
log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); if (authenticatedPeers.containsValue(peer)) {
SettableFuture<Connection> future = networkNode.sendMessage(address, message); final Address address = peer.address;
Futures.addCallback(future, new FutureCallback<Connection>() { log.trace("Broadcast message from " + getMyAddress() + " to " + address + ".");
@Override SettableFuture<Connection> future = networkNode.sendMessage(address, message);
public void onSuccess(Connection connection) { Futures.addCallback(future, new FutureCallback<Connection>() {
log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded."); @Override
} public void onSuccess(Connection connection) {
log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded.");
}
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage()); log.info("Broadcast failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(address)); UserThread.execute(() -> removePeer(address));
} }
}); });
} else {
log.debug("Peer is not in our authenticated list anymore. " +
"That can happen as we use a delay in the loop for the broadcast. " +
"Peer.address={}", peer.address);
}
}, },
10, 200, TimeUnit.MILLISECONDS)); 10, 100, TimeUnit.MILLISECONDS));
} else { } else {
log.info("Message not broadcasted because we have no authenticated peers yet. " + log.info("Message not broadcasted because we have no authenticated peers yet. " +
"message = {}", message); "message = {}", message);
@ -167,7 +174,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) { private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
Address peerAddress = message.address; Address peerAddress = message.senderAddress;
// We set the address to the connection, otherwise we will not find the connection when sending
// a reject message and we would create a new outbound connection instead using the inbound.
connection.setPeerAddress(message.senderAddress);
if (!authenticatedPeers.containsKey(peerAddress)) { if (!authenticatedPeers.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake; AuthenticationHandshake authenticationHandshake;
@ -175,7 +186,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We got an incoming AuthenticationRequest for the peerAddress ({})", peerAddress); log.info("We got an incoming AuthenticationRequest for the peerAddress ({})", peerAddress);
// We protect that connection from getting closed by maintenance cleanup... // We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), message.address); authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress);
authenticationHandshakes.put(peerAddress, authenticationHandshake); authenticationHandshakes.put(peerAddress, authenticationHandshake);
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake); doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
} else { } else {
@ -183,13 +194,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"an authentication handshake for that peerAddress ({})", peerAddress); "an authentication handshake for that peerAddress ({})", peerAddress);
log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " + log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " +
"smaller then the hashCode of the peers address ({}).", getMyAddress().hashCode(), "smaller then the hashCode of the peers address ({}).", getMyAddress().hashCode(),
message.address.hashCode()); message.senderAddress.hashCode());
authenticationHandshake = authenticationHandshakes.get(peerAddress); authenticationHandshake = authenticationHandshakes.get(peerAddress);
if (getMyAddress().hashCode() < message.address.hashCode()) { if (getMyAddress().hashCode() < peerAddress.hashCode()) {
log.info("We reject the authentication request and keep our own request alive."); log.info("We reject the authentication request and keep our own request alive.");
rejectAuthenticationRequest(message, peerAddress); rejectAuthenticationRequest(peerAddress);
} else { } else {
log.info("We accept the authentication request but cancel our own request."); log.info("We accept the authentication request but cancel our own request.");
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshake); cancelOwnAuthenticationRequest(peerAddress, authenticationHandshake);
@ -201,35 +212,34 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.warn("We got an incoming AuthenticationRequest but we are already authenticated to that peer " + log.warn("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
"with peerAddress {}.\n" + "with peerAddress {}.\n" +
"That might happen in some race conditions. We reject the request.", peerAddress); "That might happen in some race conditions. We reject the request.", peerAddress);
rejectAuthenticationRequest(message, peerAddress); rejectAuthenticationRequest(peerAddress);
} }
} }
private void processAuthenticationRejection(AuthenticationRejection message) { private void processAuthenticationRejection(AuthenticationRejection message) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
Address peerAddress = message.address; Address peerAddress = message.senderAddress;
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress)); cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress));
} }
private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection, final Address peerAddress, AuthenticationHandshake authenticationHandshake) { private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection,
Address peerAddress, AuthenticationHandshake authenticationHandshake) {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection); SettableFuture<Connection> future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress does not match connection.getPeerAddress()"); log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress);
log.info("We got the peer who did an authentication request authenticated."); addAuthenticatedPeer(connection, peerAddress);
addAuthenticatedPeer(connection, peerAddress); }
}
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication with peer who requested authentication failed.\n" + log.info("Authentication with peer who requested authentication failed.\n" +
"That can happen if the peer went offline. " + throwable.getMessage()); "That can happen if the peer went offline. " + throwable.getMessage());
removePeer(peerAddress); removePeer(peerAddress);
} }
} }
); );
} }
@ -239,9 +249,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticationHandshakes.remove(peerAddress); authenticationHandshakes.remove(peerAddress);
} }
private void rejectAuthenticationRequest(AuthenticationRequest message, Address peerAddress) { private void rejectAuthenticationRequest(Address peerAddress) {
Log.traceCall(); Log.traceCall();
networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress(), message.requesterNonce)); networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress()));
} }
@ -254,42 +264,42 @@ public class PeerGroup implements MessageListener, ConnectionListener {
seedNodeAddressesOptional = Optional.of(seedNodeAddresses); seedNodeAddressesOptional = Optional.of(seedNodeAddresses);
remainingSeedNodes.addAll(seedNodeAddresses); remainingSeedNodes.addAll(seedNodeAddresses);
remainingSeedNodes.remove(peerAddress); remainingSeedNodes.remove(peerAddress);
authenticateToFirstSeedNode(peerAddress); authenticateToFirstSeedNode(peerAddress);
} }
private void authenticateToFirstSeedNode(Address peerAddress) { private void authenticateToFirstSeedNode(Address peerAddress) {
Log.traceCall(); Log.traceCall();
if (!maxConnectionsForAuthReached()) { if (!maxConnectionsForAuthReached()) {
if (remainingSeedNodesAvailable()) {
log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); log.info("We try to authenticate to seed node {}.", peerAddress);
authenticateToRemainingReportedPeer(); authenticate(peerAddress, new FutureCallback<Connection>() {
} @Override
public void onSuccess(Connection connection) {
log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate.");
@Override addAuthenticatedPeer(connection, peerAddress);
public void onFailure(@NotNull Throwable throwable) { authenticateToRemainingReportedPeer();
log.info("Authentication to " + peerAddress + " failed." + }
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress); @Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if seed nodes are offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress);
if (remainingSeedNodesAvailable()) {
log.info("We try another random seed node for first authentication attempt."); log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes)); authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes));
} else {
log.info("There are no seed nodes available for authentication. " +
"We try if there are reported peers available to authenticate.");
authenticateToRemainingReportedPeer();
} }
}); }
} else { });
log.info("There are no seed nodes available for authentication. " +
"We try if there are reported peers available to authenticate.");
authenticateToRemainingReportedPeer();
}
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
} }
@ -330,7 +340,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} else { } else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); log.info("We don't have seed nodes or reported peers available. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
1, 2, TimeUnit.MINUTES); 10, 20, TimeUnit.SECONDS);
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
@ -346,38 +356,43 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall(); Log.traceCall();
if (!maxConnectionsForAuthReached()) { if (!maxConnectionsForAuthReached()) {
if (reportedPeersAvailable()) { if (reportedPeersAvailable()) {
Address peerAddress = getAndRemoveRandomReportedPeer(new ArrayList<>(reportedPeers)).address; if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) {
removeFromReportedPeers(peerAddress); Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address;
removeFromReportedPeers(peerAddress);
log.info("We try to authenticate to peer {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.info("We got a peer authenticated. " +
"We try if there are more reported peers available to authenticate.");
log.info("We try to authenticate to peer {}.", peerAddress); addAuthenticatedPeer(connection, peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticateToRemainingReportedPeer();
@Override }
public void onSuccess(Connection connection) {
log.info("We got a peer authenticated. " +
"We try if there are more reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); @Override
authenticateToRemainingReportedPeer(); public void onFailure(@NotNull Throwable throwable) {
} log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if the peer is offline." +
"\nException:" + throwable.getMessage());
@Override removePeer(peerAddress);
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed." +
"\nThat is expected if the peer is offline." +
"\nException:" + throwable.getMessage());
removePeer(peerAddress); log.info("We try another random seed node for authentication.");
authenticateToRemainingReportedPeer();
log.info("We try another random seed node for authentication."); }
authenticateToRemainingReportedPeer(); });
} } else {
}); log.info("We don't have a reported peers available (maybe one is authenticating already). We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
} else if (remainingSeedNodesAvailable()) { } else if (remainingSeedNodesAvailable()) {
authenticateToRemainingSeedNode(); authenticateToRemainingSeedNode();
} else { } else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause."); log.info("We don't have seed nodes or reported peers available. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
1, 2, TimeUnit.MINUTES); 30, 40, TimeUnit.SECONDS);
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
@ -442,6 +457,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void addAuthenticatedPeer(Connection connection, Address peerAddress) { private void addAuthenticatedPeer(Connection connection, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress()); Log.traceCall(peerAddress.getFullAddress());
connection.setPeerAddress(peerAddress);
connection.setAuthenticated();
if (authenticationHandshakes.containsKey(peerAddress)) if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress); authenticationHandshakes.remove(peerAddress);
@ -452,15 +471,14 @@ public class PeerGroup implements MessageListener, ConnectionListener {
+ "\npeerAddress= " + peerAddress + "\npeerAddress= " + peerAddress
+ "\n############################################################\n"); + "\n############################################################\n");
authenticatedPeers.put(peerAddress, new Peer(connection)); authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress));
removeFromReportedPeers(peerAddress); removeFromReportedPeers(peerAddress);
if (!checkIfConnectedPeersExceeds()) if (!checkIfConnectedPeersExceeds())
printAuthenticatedPeers(); printAuthenticatedPeers();
connection.setAuthenticated(peerAddress); //TODO check if address is set already authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
} }
void removePeer(@Nullable Address peerAddress) { void removePeer(@Nullable Address peerAddress) {
@ -524,6 +542,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY) { if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY) {
authenticatedConnections = allConnections.stream() authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
} }
@ -536,10 +555,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Connection connection = authenticatedConnections.remove(0); Connection connection = authenticatedConnections.remove(0);
log.info("We are going to shut down the oldest connection with last activity date=" log.info("We are going to shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection); + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS)); connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 10, 50, TimeUnit.MILLISECONDS));
return true; return true;
} else { } else {
log.warn("authenticatedConnections.size() == 0. That must never happen here. (checkIfConnectedPeersExceeds)"); log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)");
return false; return false;
} }
} else { } else {
@ -564,13 +583,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return all; return all;
} }
public boolean isInAuthenticationProcess(Address address) {
return authenticationHandshakes.containsKey(address);
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Reported peers // Reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) { void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) {
Log.traceCall(); Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd);
// we disconnect misbehaving nodes trying to send too many peers // we disconnect misbehaving nodes trying to send too many peers
// reported peers include the authenticated peers which is normally max. 8 but we give some headroom // reported peers include the authenticated peers which is normally max. 8 but we give some headroom
// for safety // for safety
@ -640,6 +663,18 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return list.remove(new Random().nextInt(list.size())); return list.remove(new Random().nextInt(list.size()));
} }
private Optional<ReportedPeer> getAndRemoveNotAuthenticatingReportedPeer() {
Optional<ReportedPeer> reportedPeer = Optional.empty();
List<ReportedPeer> list = new ArrayList<>(reportedPeers);
if (!list.isEmpty()) {
do {
reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list));
}
while (!list.isEmpty() && authenticationHandshakes.containsKey(reportedPeer.get().address));
}
return reportedPeer;
}
private Address getAndRemoveRandomAddress(List<Address> list) { private Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty"); checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size())); return list.remove(new Random().nextInt(list.size()));
@ -666,5 +701,4 @@ public class PeerGroup implements MessageListener, ConnectionListener {
result.append("\n------------------------------------------------------------\n"); result.append("\n------------------------------------------------------------\n");
log.info(result.toString()); log.info(result.toString());
} }
} }

View file

@ -8,12 +8,13 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionPriority;
import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.data.DataRequest;
import io.bitsquare.p2p.peers.messages.data.DataResponse;
import io.bitsquare.p2p.storage.P2PDataStorage; import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.data.ProtectedData; import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.messages.GetDataRequest;
import io.bitsquare.p2p.storage.messages.GetDataResponse;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -40,6 +41,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private final NetworkNode networkNode; private final NetworkNode networkNode;
private final P2PDataStorage dataStorage; private final P2PDataStorage dataStorage;
private final PeerGroup peerGroup;
private final Listener listener; private final Listener listener;
private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty(); private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty();
@ -50,9 +52,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) { public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerGroup peerGroup, Listener listener) {
this.networkNode = networkNode; this.networkNode = networkNode;
this.dataStorage = dataStorage; this.dataStorage = dataStorage;
this.peerGroup = peerGroup;
this.listener = listener; this.listener = listener;
networkNode.addMessageListener(this); networkNode.addMessageListener(this);
@ -70,39 +73,46 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
if (!seedNodeAddresses.isEmpty()) { if (!seedNodeAddresses.isEmpty()) {
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingSeedNodeAddresses); Collections.shuffle(remainingSeedNodeAddresses);
Address candidate = remainingSeedNodeAddresses.remove(0); Address candidate = remainingSeedNodeAddresses.get(0);
if (!peerGroup.isInAuthenticationProcess(candidate)) {
// We only remove it if it is not in the process of authentication
remainingSeedNodeAddresses.remove(0);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate); SettableFuture<Connection> future = networkNode.sendMessage(candidate, new DataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen.");
optionalConnectedSeedNodeAddress = Optional.of(candidate);
}
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest()); @Override
Futures.addCallback(future, new FutureCallback<Connection>() { public void onFailure(@NotNull Throwable throwable) {
@Override log.info("Send GetAllDataMessage to " + candidate + " failed. " +
public void onSuccess(@Nullable Connection connection) { "That is expected if the seed node is offline. " +
log.info("Send GetAllDataMessage to " + candidate + " succeeded."); "Exception:" + throwable.getMessage());
checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); if (!remainingSeedNodeAddresses.isEmpty())
optionalConnectedSeedNodeAddress = Optional.of(candidate); log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses);
}
@Override requestData(remainingSeedNodeAddresses);
public void onFailure(@NotNull Throwable throwable) { }
log.info("Send GetAllDataMessage to " + candidate + " failed. " + });
"That is expected if the seed node is offline. " + } else {
"Exception:" + throwable.getMessage()); log.info("The seed node ({}) is in the process of authentication.\n" +
if (!remainingSeedNodeAddresses.isEmpty()) "We will try again after a pause of 3-5 sec.", candidate);
log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses); listener.onNoSeedNodeAvailable();
UserThread.runAfterRandomDelay(() -> requestData(remainingSeedNodeAddresses),
requestData(remainingSeedNodeAddresses); 3, 5, TimeUnit.SECONDS);
} }
});
} else { } else {
log.info("There is no seed node available for requesting data. " + log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is online.\n" + "That is expected if no seed node is online.\n" +
"We will try again after a pause of 20-30 sec."); "We will try again after a pause of 10-20 sec.");
listener.onNoSeedNodeAvailable(); listener.onNoSeedNodeAvailable();
// We re try after 20-30 sec.
UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()), UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()),
20, 30, TimeUnit.SECONDS); 10, 20, TimeUnit.SECONDS);
} }
} }
@ -113,18 +123,18 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
@Override @Override
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
if (message instanceof GetDataRequest) { if (message instanceof DataRequest) {
// We are a seed node and receive that msg from a new node // We are a seed node and receive that msg from a new node
Log.traceCall(message.toString()); Log.traceCall(message.toString());
networkNode.sendMessage(connection, new GetDataResponse(new HashSet<>(dataStorage.getMap().values()))); networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values())));
} else if (message instanceof GetDataResponse) { } else if (message instanceof DataResponse) {
// We are the new node which has requested the data // We are the new node which has requested the data
Log.traceCall(message.toString()); Log.traceCall(message.toString());
GetDataResponse getDataResponse = (GetDataResponse) message; DataResponse dataResponse = (DataResponse) message;
HashSet<ProtectedData> set = getDataResponse.set; HashSet<ProtectedData> set = dataResponse.set;
// we keep that connection open as the bootstrapping peer will use that for the authentication // we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted // as we are not authenticated yet the data adding will not be broadcasted
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); connection.getPeerAddress().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress)); optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress));
} }
} }
@ -135,10 +145,13 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { public void onPeerAuthenticated(Address peerAddress, Connection connection) {
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> { optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
if (connectedSeedNodeAddress.equals(peerAddress)) // We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE)
requestDataFromAuthenticatedSeedNode(peerAddress, connection); // We delay a bit to be sure that the authentication state is applied to all threads
if (connection.getConnectionPriority() == ConnectionPriority.ACTIVE && connectedSeedNodeAddress.equals(peerAddress))
UserThread.runAfter(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS);
}); });
} }
@ -147,7 +160,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) { private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString()); Log.traceCall(peerAddress.toString());
// We have to request the data again as we might have missed pushed data in the meantime // We have to request the data again as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(connection, new GetDataRequest()); SettableFuture<Connection> future = networkNode.sendMessage(connection, new DataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(@Nullable Connection connection) { public void onSuccess(@Nullable Connection connection) {

View file

@ -6,24 +6,26 @@ import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet; import java.util.HashSet;
public final class GetPeersAuthRequest extends AuthenticationMessage { public final class AuthenticationChallenge extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final long requesterNonce;
public final long responderNonce; public final long responderNonce;
public final HashSet<ReportedPeer> reportedPeers; public final HashSet<ReportedPeer> reportedPeers;
public GetPeersAuthRequest(Address address, long responderNonce, HashSet<ReportedPeer> reportedPeers) { public AuthenticationChallenge(Address senderAddress, long requesterNonce, long responderNonce, HashSet<ReportedPeer> reportedPeers) {
super(address); super(senderAddress);
this.requesterNonce = requesterNonce;
this.responderNonce = responderNonce; this.responderNonce = responderNonce;
this.reportedPeers = reportedPeers; this.reportedPeers = reportedPeers;
} }
@Override @Override
public String toString() { public String toString() {
return "GetPeersAuthRequest{" + return "AuthenticationChallenge{" +
"address=" + address + ", requesterNonce=" + requesterNonce +
", challengerNonce=" + responderNonce + ", responderNonce=" + responderNonce +
", reportedPeers=" + reportedPeers + ", reportedPeers=" + reportedPeers +
super.toString() + "} "; super.toString() + "} ";
} }

View file

@ -7,10 +7,10 @@ import io.bitsquare.p2p.Message;
public abstract class AuthenticationMessage implements Message { public abstract class AuthenticationMessage implements Message {
private final int networkId = Version.NETWORK_ID; private final int networkId = Version.NETWORK_ID;
public final Address address; public final Address senderAddress;
public AuthenticationMessage(Address address) { public AuthenticationMessage(Address senderAddress) {
this.address = address; this.senderAddress = senderAddress;
} }
@Override @Override
@ -20,7 +20,7 @@ public abstract class AuthenticationMessage implements Message {
@Override @Override
public String toString() { public String toString() {
return ", address=" + address.toString() + return ", address=" + senderAddress.toString() +
", networkId=" + networkId + ", networkId=" + networkId +
'}'; '}';
} }

View file

@ -7,18 +7,14 @@ public final class AuthenticationRejection extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final long requesterNonce; public AuthenticationRejection(Address senderAddress) {
super(senderAddress);
public AuthenticationRejection(Address address, long requesterNonce) {
super(address);
this.requesterNonce = requesterNonce;
} }
@Override @Override
public String toString() { public String toString() {
return "AuthenticationReject{" + return "AuthenticationReject{" +
"address=" + address + "address=" + senderAddress +
", requesterNonce=" + requesterNonce +
super.toString() + "} "; super.toString() + "} ";
} }
} }

View file

@ -9,16 +9,16 @@ public final class AuthenticationRequest extends AuthenticationMessage {
public final long requesterNonce; public final long requesterNonce;
public AuthenticationRequest(Address address, long requesterNonce) { public AuthenticationRequest(Address senderAddress, long requesterNonce) {
super(address); super(senderAddress);
this.requesterNonce = requesterNonce; this.requesterNonce = requesterNonce;
} }
@Override @Override
public String toString() { public String toString() {
return "AuthenticationRequest{" + return "AuthenticationRequest{" +
"address=" + address + "senderAddress=" + senderAddress +
", nonce=" + requesterNonce + ", requesterNonce=" + requesterNonce +
super.toString() + "} "; super.toString() + "} ";
} }
} }

View file

@ -2,26 +2,29 @@ package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
public final class AuthenticationResponse extends AuthenticationMessage { public final class AuthenticationResponse extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final long requesterNonce;
public final long responderNonce; public final long responderNonce;
public final HashSet<ReportedPeer> reportedPeers;
public AuthenticationResponse(Address address, long requesterNonce, long responderNonce) { public AuthenticationResponse(Address senderAddress, long responderNonce, HashSet<ReportedPeer> reportedPeers) {
super(address); super(senderAddress);
this.requesterNonce = requesterNonce;
this.responderNonce = responderNonce; this.responderNonce = responderNonce;
this.reportedPeers = reportedPeers;
} }
@Override @Override
public String toString() { public String toString() {
return "AuthenticationResponse{" + return "AuthenticationResponse{" +
"address=" + address + "address=" + senderAddress +
", requesterNonce=" + requesterNonce + ", responderNonce=" + responderNonce +
", challengerNonce=" + responderNonce + ", reportedPeers=" + reportedPeers +
super.toString() + "} "; super.toString() + "} ";
} }
} }

View file

@ -1,27 +0,0 @@
package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
public final class GetPeersAuthResponse extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersAuthResponse(Address address, HashSet<ReportedPeer> reportedPeers) {
super(address);
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersAuthResponse{" +
"address=" + address +
", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
}

View file

@ -1,15 +1,15 @@
package io.bitsquare.p2p.storage.messages; package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
public final class GetDataRequest implements Message { public final class DataRequest implements Message {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID; private final int networkId = Version.NETWORK_ID;
public GetDataRequest() { public DataRequest() {
} }
@Override @Override

View file

@ -1,4 +1,4 @@
package io.bitsquare.p2p.storage.messages; package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
@ -6,14 +6,14 @@ import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet; import java.util.HashSet;
public final class GetDataResponse implements Message { public final class DataResponse implements Message {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID; private final int networkId = Version.NETWORK_ID;
public final HashSet<ProtectedData> set; public final HashSet<ProtectedData> set;
public GetDataResponse(HashSet<ProtectedData> set) { public DataResponse(HashSet<ProtectedData> set) {
this.set = set; this.set = set;
} }
@ -25,9 +25,9 @@ public final class GetDataResponse implements Message {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (!(o instanceof GetDataResponse)) return false; if (!(o instanceof DataResponse)) return false;
GetDataResponse that = (GetDataResponse) o; DataResponse that = (DataResponse) o;
return !(set != null ? !set.equals(that.set) : that.set != null); return !(set != null ? !set.equals(that.set) : that.set != null);

View file

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peers.messages.peerexchange; package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
@ -10,18 +10,18 @@ public final class GetPeersRequest extends PeerExchangeMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address; public final Address senderAddress;
public final HashSet<ReportedPeer> reportedPeers; public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(Address address, HashSet<ReportedPeer> reportedPeers) { public GetPeersRequest(Address senderAddress, HashSet<ReportedPeer> reportedPeers) {
this.address = address; this.senderAddress = senderAddress;
this.reportedPeers = reportedPeers; this.reportedPeers = reportedPeers;
} }
@Override @Override
public String toString() { public String toString() {
return "GetPeersRequest{" + return "GetPeersRequest{" +
"address=" + address + "senderAddress=" + senderAddress +
", reportedPeers=" + reportedPeers + ", reportedPeers=" + reportedPeers +
super.toString() + "} "; super.toString() + "} ";
} }

View file

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peers.messages.peerexchange; package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.peers.ReportedPeer; import io.bitsquare.p2p.peers.ReportedPeer;

View file

@ -1,4 +1,4 @@
package io.bitsquare.p2p.peers.messages.peerexchange; package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;

View file

@ -116,17 +116,18 @@ public class P2PDataStorage implements MessageListener {
Log.traceCall(message.toString()); Log.traceCall(message.toString());
if (connection.isAuthenticated()) { if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection); log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
if (message instanceof AddDataMessage) { connection.getPeerAddress().ifPresent(peerAddress -> {
add(((AddDataMessage) message).data, connection.getPeerAddress()); if (message instanceof AddDataMessage) {
} else if (message instanceof RemoveDataMessage) { add(((AddDataMessage) message).data, peerAddress);
remove(((RemoveDataMessage) message).data, connection.getPeerAddress()); } else if (message instanceof RemoveDataMessage) {
} else if (message instanceof RemoveMailboxDataMessage) { remove(((RemoveDataMessage) message).data, peerAddress);
removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress()); } else if (message instanceof RemoveMailboxDataMessage) {
} removeMailboxData(((RemoveMailboxDataMessage) message).data, peerAddress);
}
});
} else { } else {
log.warn("Connection is not authenticated yet. " + log.warn("Connection is not authenticated yet. " +
"We don't accept storage operations from non-authenticated nodes."); "We don't accept storage operations from non-authenticated nodes. connection=", connection);
log.trace("Connection = " + connection);
connection.reportIllegalRequest(IllegalRequest.NotAuthenticated); connection.reportIllegalRequest(IllegalRequest.NotAuthenticated);
} }
} }