diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java
index 9313039ba1..2d697d5b52 100644
--- a/network/src/main/java/io/bitsquare/p2p/P2PService.java
+++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java
@@ -134,14 +134,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
peerGroup = new PeerGroup(networkNode);
peerGroup.addAuthenticationListener(this);
if (useLocalhost)
- PeerGroup.setSimulateAuthTorNode(200);
+ PeerGroup.setSimulateAuthTorNode(100);
// P2P network data storage
dataStorage = new P2PDataStorage(peerGroup, networkNode, storageDir);
dataStorage.addHashMapChangedListener(this);
// Request initial data manager
- requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() {
+ requestDataManager = new RequestDataManager(networkNode, dataStorage, peerGroup, new RequestDataManager.Listener() {
@Override
public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
@@ -286,9 +286,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
- if (connection.isAuthenticated())
- authenticatedPeerAddresses.remove(connection.getPeerAddress());
-
+ connection.getPeerAddress().ifPresent(peerAddresses -> authenticatedPeerAddresses.remove(peerAddresses));
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
@@ -302,10 +300,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
+ public void onPeerAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
- checkArgument(peerAddress.equals(connection.getPeerAddress()),
- "peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
if (!firstPeerAuthenticated.get()) {
@@ -315,8 +311,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
numAuthenticatedPeers.set(authenticatedPeerAddresses.size());
}
-
-
+
+
///////////////////////////////////////////////////////////////////////////////////////////
// MessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@@ -339,8 +335,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
connection.setConnectionPriority(ConnectionPriority.DIRECT_MSG);
log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey);
- decryptedMailListeners.stream().forEach(
- e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()));
+ connection.getPeerAddress().ifPresent(peerAddresses ->
+ decryptedMailListeners.stream().forEach(
+ e -> e.onMailMessage(decryptedMsgWithPubKey, peerAddresses)));
} else {
log.info("Wrong receiverAddressMaskHash. The message is not intended for us.");
}
diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
index 152d3c37eb..3499eb2584 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java
@@ -19,9 +19,12 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Date;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Connection is created by the server thread or by sendMessage from NetworkNode.
* All handlers are called on User thread.
@@ -31,7 +34,7 @@ public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
//timeout on blocking Socket operations like ServerSocket.accept() or SocketInputStream.read()
- private static final int SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 min.
+ private static final int SOCKET_TIMEOUT = 10 * 60 * 1000; // 10 min.
private ConnectionPriority connectionPriority;
public static int getMaxMsgSize() {
@@ -41,6 +44,7 @@ public class Connection implements MessageListener {
private final Socket socket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
+
private final String portInfo;
private final String uid = UUID.randomUUID().toString();
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
@@ -52,8 +56,7 @@ public class Connection implements MessageListener {
private ObjectOutputStream objectOutputStream;
// mutable data, set from other threads but not changed internally.
- @Nullable
- private Address peerAddress;
+ private Optional
peerAddressOptional = Optional.empty();
private volatile boolean isAuthenticated;
private volatile boolean stopped;
@@ -116,9 +119,8 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Called form UserThread
- public void setAuthenticated(Address peerAddress) {
+ public void setAuthenticated() {
Log.traceCall();
- this.peerAddress = peerAddress;
isAuthenticated = true;
}
@@ -131,9 +133,10 @@ public class Connection implements MessageListener {
Log.traceCall();
if (!stopped) {
try {
+ String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null";
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}"
- + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", getPeerAddress(), uid, message);
+ + "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", peerAddress, uid, message);
Object objectToWrite;
if (useCompression) {
@@ -167,9 +170,10 @@ public class Connection implements MessageListener {
sharedSpace.reportIllegalRequest(illegalRequest);
}
- public synchronized void setPeerAddress(@Nullable Address peerAddress) {
+ public synchronized void setPeerAddress(Address peerAddress) {
Log.traceCall();
- this.peerAddress = peerAddress;
+ checkNotNull(peerAddress, "peerAddress must not be null");
+ peerAddressOptional = Optional.of(peerAddress);
}
@@ -189,8 +193,12 @@ public class Connection implements MessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
@Nullable
- public synchronized Address getPeerAddress() {
- return peerAddress;
+ public synchronized Address getPeerAddress1() {
+ return peerAddressOptional.isPresent() ? peerAddressOptional.get() : null;
+ }
+
+ public synchronized Optional getPeerAddress() {
+ return peerAddressOptional;
}
public Date getLastActivityDate() {
@@ -213,6 +221,7 @@ public class Connection implements MessageListener {
return connectionPriority;
}
+
///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown
///////////////////////////////////////////////////////////////////////////////////////////
@@ -232,6 +241,7 @@ public class Connection implements MessageListener {
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(this.toString());
if (!stopped) {
+ String peerAddress = peerAddressOptional.isPresent() ? peerAddressOptional.get().toString() : "null";
log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeerAddress=" + peerAddress
@@ -249,31 +259,32 @@ public class Connection implements MessageListener {
Log.traceCall("sendCloseConnectionMessage");
try {
sendMessage(new CloseConnectionMessage());
- stopped = true;
- sharedSpace.stop();
- if (inputHandler != null)
- inputHandler.stop();
+ setStopFlags();
- // TODO increase delay
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
t.printStackTrace();
} finally {
- UserThread.execute(() -> continueShutDown(shutDownCompleteHandler));
+ UserThread.execute(() -> doShutDown(shutDownCompleteHandler));
}
}).start();
} else {
- stopped = true;
- sharedSpace.stop();
- if (inputHandler != null)
- inputHandler.stop();
- continueShutDown(shutDownCompleteHandler);
+ setStopFlags();
+ doShutDown(shutDownCompleteHandler);
}
}
}
- private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) {
+ private void setStopFlags() {
+ stopped = true;
+ sharedSpace.stop();
+ if (inputHandler != null)
+ inputHandler.stop();
+ isAuthenticated = false;
+ }
+
+ private void doShutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall();
ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason();
if (shutDownReason == null)
@@ -309,7 +320,7 @@ public class Connection implements MessageListener {
if (portInfo != null ? !portInfo.equals(that.portInfo) : that.portInfo != null) return false;
if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false;
- return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null);
+ return peerAddressOptional != null ? peerAddressOptional.equals(that.peerAddressOptional) : that.peerAddressOptional == null;
}
@@ -317,7 +328,7 @@ public class Connection implements MessageListener {
public int hashCode() {
int result = portInfo != null ? portInfo.hashCode() : 0;
result = 31 * result + (uid != null ? uid.hashCode() : 0);
- result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0);
+ result = 31 * result + (peerAddressOptional != null ? peerAddressOptional.hashCode() : 0);
return result;
}
@@ -327,7 +338,7 @@ public class Connection implements MessageListener {
"portInfo=" + portInfo +
", uid='" + uid + '\'' +
", sharedSpace=" + sharedSpace.toString() +
- ", peerAddress=" + peerAddress +
+ ", peerAddress=" + peerAddressOptional +
", isAuthenticated=" + isAuthenticated +
", stopped=" + stopped +
", stopped=" + stopped +
@@ -398,6 +409,7 @@ public class Connection implements MessageListener {
public void handleConnectionException(Exception e) {
Log.traceCall(e.toString());
+ log.debug("connection=" + this);
if (e instanceof SocketException) {
if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@@ -405,11 +417,12 @@ public class Connection implements MessageListener {
shutDownReason = ConnectionListener.Reason.RESET;
} else if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
shutDownReason = ConnectionListener.Reason.TIMEOUT;
+ log.warn("TimeoutException at connection with port " + socket.getLocalPort());
} else if (e instanceof EOFException) {
shutDownReason = ConnectionListener.Reason.PEER_DISCONNECTED;
} else {
shutDownReason = ConnectionListener.Reason.UNKNOWN;
- log.info("Exception at connection with port " + socket.getLocalPort());
+ log.warn("Exception at connection with port " + socket.getLocalPort());
e.printStackTrace();
}
diff --git a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java
index 46576bda4c..d80d45b557 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/LocalhostNetworkNode.java
@@ -27,8 +27,8 @@ import java.util.function.Consumer;
public class LocalhostNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
- private static volatile int simulateTorDelayTorNode = 600;
- private static volatile int simulateTorDelayHiddenService = 3000;
+ private static volatile int simulateTorDelayTorNode = 100;
+ private static volatile int simulateTorDelayHiddenService = 500;
private Address address;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
index f7daef37a5..d6b65f717d 100644
--- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
+++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java
@@ -15,7 +15,9 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeoutException;
@@ -62,7 +64,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture sendMessage(@NotNull Address peerAddress, Message message) {
- Log.traceCall("message: " + message + " to peerAddress: " + peerAddress);
+ Log.traceCall("peerAddress: " + peerAddress + " / message: " + message);
checkNotNull(peerAddress, "peerAddress must not be null");
Optional outboundConnectionOptional = lookupOutboundConnection(peerAddress);
@@ -86,8 +88,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
if (connection != null) {
return sendMessage(connection, message);
} else {
- log.trace("We have not found any connection for that peerAddress. " +
- "We will create a new outbound connection.");
+ log.trace("We have not found any connection for peerAddress {}. " +
+ "We will create a new outbound connection.", peerAddress);
final SettableFuture resultFuture = SettableFuture.create();
final boolean[] timeoutOccurred = new boolean[1];
@@ -96,7 +98,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress);
try {
// can take a while when using tor
- Socket socket = createSocket(peerAddress);
+ Socket socket = createSocket(peerAddress);
if (timeoutOccurred[0])
throw new TimeoutException("Timeout occurred when tried to create Socket to peer: " + peerAddress);
@@ -114,7 +116,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// can take a while when using tor
newConnection.sendMessage(message);
- return newConnection;
+ return newConnection;
} catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) {
throwable.printStackTrace();
@@ -124,7 +126,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
});
- Timer timer = new Timer();
+ //TODO does not close the connection yet. not clear if socket timeout is enough.
+ /*Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
@@ -135,19 +138,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.info(message);
UserThread.execute(() -> resultFuture.setException(new TimeoutException(message)));
}
- }, CREATE_SOCKET_TIMEOUT);
+ }, CREATE_SOCKET_TIMEOUT);*/
Futures.addCallback(future, new FutureCallback() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> {
- timer.cancel();
+ //timer.cancel();
resultFuture.set(connection);
});
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> {
- timer.cancel();
+ //timer.cancel();
resultFuture.setException(throwable);
});
}
@@ -158,7 +161,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
public SettableFuture sendMessage(Connection connection, Message message) {
- Log.traceCall();
+ Log.traceCall("message: " + message + " to connection: " + connection);
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
@@ -223,15 +226,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onConnection(Connection connection) {
- Log.traceCall("NetworkNode connection=" + connection);
+ Log.traceCall("connection=" + connection);
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
- Log.traceCall();
- Address peerAddress = connection.getPeerAddress();
- log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress);
+ Log.traceCall("connection = " + connection);
outBoundConnections.remove(connection);
inBoundConnections.remove(connection);
connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection));
@@ -311,10 +312,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onDisconnect(Reason reason, Connection connection) {
- Log.traceCall();
- Address peerAddress = connection.getPeerAddress();
- log.trace("onDisconnect at incoming connection to peerAddress (or connection) "
- + ((peerAddress == null) ? connection : peerAddress));
+ Log.traceCall("onDisconnect at incoming connection = " + connection);
inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection);
}
@@ -334,13 +332,13 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Optional lookupOutboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / outBoundConnections " + outBoundConnections);
return outBoundConnections.stream()
- .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
+ .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
}
private Optional lookupInboundConnection(Address peerAddress) {
Log.traceCall("search for " + peerAddress.toString() + " / inBoundConnections " + inBoundConnections);
return inBoundConnections.stream()
- .filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
+ .filter(e -> e.getPeerAddress().isPresent() && peerAddress.equals(e.getPeerAddress().get())).findAny();
}
abstract protected Socket createSocket(Address peerAddress) throws IOException;
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
index ad5e45788c..616e239bc2 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java
@@ -11,7 +11,10 @@ import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionPriority;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
-import io.bitsquare.p2p.peers.messages.auth.*;
+import io.bitsquare.p2p.peers.messages.auth.AuthenticationChallenge;
+import io.bitsquare.p2p.peers.messages.auth.AuthenticationMessage;
+import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
+import io.bitsquare.p2p.peers.messages.auth.AuthenticationResponse;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,14 +24,13 @@ import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-// authentication protocol:
-// node2 -> node1 AuthenticationRequest
-// node1: close connection
-// node1 -> node2 AuthenticationResponse on new connection
-// node2: authentication to node1 done if nonce ok
-// node2 -> node1 GetPeersAuthRequest
-// node1: authentication to node2 done if nonce ok
-// node1 -> node2 GetPeersAuthResponse
+// Authentication protocol:
+// client: send AuthenticationRequest to seedNode
+// seedNode: close connection
+// seedNode: send AuthenticationChallenge to client on a new connection to test if address is correct
+// client: authentication to seedNode done if nonce verification is ok
+// client: AuthenticationResponse to seedNode
+// seedNode: authentication to client done if nonce verification is ok
public class AuthenticationHandshake implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class);
@@ -76,28 +78,33 @@ public class AuthenticationHandshake implements MessageListener {
if (message instanceof AuthenticationMessage) {
// We are listening on all connections, so we need to filter out only our peer
- if (((AuthenticationMessage) message).address.equals(peerAddress)) {
+ if (((AuthenticationMessage) message).senderAddress.equals(peerAddress)) {
Log.traceCall(message.toString());
- if (message instanceof AuthenticationResponse) {
+ if (message instanceof AuthenticationChallenge) {
// Requesting peer
+ AuthenticationChallenge authenticationChallenge = (AuthenticationChallenge) message;
+ // We need to set the address to the connection, otherwise we will not find the connection when sending
+ // the next message and we would create a new outbound connection instead using the inbound.
+ connection.setPeerAddress(authenticationChallenge.senderAddress);
// We use the active connectionType if we started the authentication request to another peer
- // That is used for protecting eclipse attacks
connection.setConnectionPriority(ConnectionPriority.ACTIVE);
-
- AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
- connection.setPeerAddress(peerAddress);
log.trace("Received authenticationResponse from " + peerAddress);
- boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce;
+ boolean verified = nonce != 0 && nonce == authenticationChallenge.requesterNonce;
if (verified) {
- GetPeersAuthRequest getPeersAuthRequest = new GetPeersAuthRequest(myAddress,
- authenticationResponse.responderNonce,
+ AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress,
+ authenticationChallenge.responderNonce,
new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
- SettableFuture future = networkNode.sendMessage(peerAddress, getPeersAuthRequest);
- log.trace("Sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
+ SettableFuture future = networkNode.sendMessage(peerAddress, authenticationResponse);
+ log.trace("Sent GetPeersAuthRequest {} to {}", authenticationResponse, peerAddress);
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(Connection connection) {
- log.trace("Successfully sent GetPeersAuthRequest {} to {}", getPeersAuthRequest, peerAddress);
+ log.trace("Successfully sent GetPeersAuthRequest to {}", peerAddress);
+
+ log.info("AuthenticationComplete: Peer with address " + peerAddress
+ + " authenticated (" + connection.getUid() + "). Took "
+ + (System.currentTimeMillis() - startAuthTs) + " ms.");
+ completed(connection);
}
@Override
@@ -107,63 +114,27 @@ public class AuthenticationHandshake implements MessageListener {
}
});
- // We could set already the authenticated flag here already, but as we need the reported peers we need
- // to wait for the GetPeersAuthResponse before we are completed.
+ // now we add the reported peers to our list
+ peerGroup.addToReportedPeers(authenticationChallenge.reportedPeers, connection);
} else {
- log.warn("verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonce=" + nonce);
- failed(new Exception("Verify nonce failed. AuthenticationResponse=" + authenticationResponse + " / nonceMap=" + nonce));
+ log.warn("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonce=" + nonce);
+ failed(new Exception("Verification of nonce failed. AuthenticationResponse=" + authenticationChallenge + " / nonceMap=" + nonce));
}
- } else if (message instanceof GetPeersAuthRequest) {
+ } else if (message instanceof AuthenticationResponse) {
// Responding peer
- GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message;
- log.trace("GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
- boolean verified = nonce != 0 && nonce == getPeersAuthRequest.responderNonce;
+ AuthenticationResponse authenticationResponse = (AuthenticationResponse) message;
+ log.trace("Received GetPeersAuthRequest from " + peerAddress + " at " + myAddress);
+ boolean verified = nonce != 0 && nonce == authenticationResponse.responderNonce;
if (verified) {
- // we create the msg with our already collected peer addresses (before adding the new ones)
- GetPeersAuthResponse getPeersAuthResponse = new GetPeersAuthResponse(myAddress,
- new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
- SettableFuture future = networkNode.sendMessage(peerAddress, getPeersAuthResponse);
- log.trace("Sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
-
- // now we add the reported peers to our own set
- HashSet reportedPeers = getPeersAuthRequest.reportedPeers;
- log.trace("Received reported peers: " + reportedPeers);
- peerGroup.addToReportedPeers(reportedPeers, connection);
-
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.trace("Successfully sent GetPeersAuthResponse {} to {}", getPeersAuthResponse, peerAddress);
- log.info("AuthenticationComplete: Peer with address " + peerAddress
- + " authenticated (" + connection.getUid() + "). Took "
- + (System.currentTimeMillis() - startAuthTs) + " ms.");
-
- completed(connection);
- }
-
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("GetPeersAuthResponse sending failed " + throwable.getMessage());
- failed(throwable);
- }
- });
+ peerGroup.addToReportedPeers(authenticationResponse.reportedPeers, connection);
+ log.info("AuthenticationComplete: Peer with address " + peerAddress
+ + " authenticated (" + connection.getUid() + "). Took "
+ + (System.currentTimeMillis() - startAuthTs) + " ms.");
+ completed(connection);
} else {
- log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce);
- failed(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce));
+ log.warn("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce);
+ failed(new Exception("Verification of nonce failed. getPeersMessage=" + authenticationResponse + " / nonce=" + nonce));
}
- } else if (message instanceof GetPeersAuthResponse) {
- // Requesting peer
- GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message;
- log.trace("GetPeersAuthResponse from " + peerAddress + " at " + myAddress);
- HashSet reportedPeers = getPeersAuthResponse.reportedPeers;
- log.trace("Received reported peers: " + reportedPeers);
- peerGroup.addToReportedPeers(reportedPeers, connection);
-
- log.info("AuthenticationComplete: Peer with address " + peerAddress
- + " authenticated (" + connection.getUid() + "). Took "
- + (System.currentTimeMillis() - startAuthTs) + " ms.");
-
- completed(connection);
}
}
}
@@ -190,7 +161,6 @@ public class AuthenticationHandshake implements MessageListener {
public void onSuccess(Connection connection) {
log.trace("send AuthenticationRequest to " + peerAddress + " succeeded.");
- connection.setPeerAddress(peerAddress);
// We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
}
@@ -222,40 +192,42 @@ public class AuthenticationHandshake implements MessageListener {
resultFutureOptional = Optional.of(SettableFuture.create());
- log.trace("AuthenticationRequest from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " +
- "connection with his reported address.", peerAddress);
+ "connection with his reported address to verify if his address is correct.", peerAddress);
connection.shutDown(() -> {
UserThread.runAfter(() -> {
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
- // inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
- log.trace("processAuthenticationMessage: connection.shutDown complete. AuthenticationRequest from " + peerAddress + " at " + myAddress);
+ // inconsistent state
+ log.trace("respondToAuthenticationRequest: connection.shutDown complete. peerAddress=" + peerAddress + " / myAddress=" + myAddress);
- AuthenticationResponse authenticationResponse = new AuthenticationResponse(myAddress,
+ // we send additionally the reported and authenticated peers to save one message in the protocol.
+ AuthenticationChallenge authenticationChallenge = new AuthenticationChallenge(myAddress,
authenticationRequest.requesterNonce,
- getAndSetNonce());
- SettableFuture future = networkNode.sendMessage(peerAddress, authenticationResponse);
+ getAndSetNonce(),
+ new HashSet<>(peerGroup.getAuthenticatedAndReportedPeers()));
+ SettableFuture future = networkNode.sendMessage(peerAddress, authenticationChallenge);
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(Connection connection) {
- log.trace("onSuccess sending AuthenticationResponse");
+ log.trace("AuthenticationResponse successfully sent");
- connection.setPeerAddress(peerAddress);
- // We use passive connectionType for connections created from received authentication requests from other peers
- // That is used for protecting eclipse attacks
+ // We use passive connectionType for connections created from received authentication
+ // requests from other peers
connection.setConnectionPriority(ConnectionPriority.PASSIVE);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
- log.warn("onFailure sending AuthenticationResponse.");
+ log.warn("onFailure sending AuthenticationResponse. " + throwable.getMessage());
failed(throwable);
}
});
+ } else {
+ log.warn("AuthenticationHandshake already shut down before we could sent AuthenticationResponse. That might happen in rare cases.");
}
- }, 200, TimeUnit.MILLISECONDS);
+ }, 1000, TimeUnit.MILLISECONDS); // Don't set the delay too short as the CloseConnectionMessage might arrive too late at the peer
});
return resultFutureOptional.get();
}
@@ -284,7 +256,7 @@ public class AuthenticationHandshake implements MessageListener {
}
private void failed(@NotNull Throwable throwable) {
- Log.traceCall();
+ Log.traceCall(throwable.toString());
shutDown();
if (resultFutureOptional.isPresent())
resultFutureOptional.get().setException(throwable);
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java
index 7c65ff3ecb..1357e0761c 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java
@@ -4,5 +4,5 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
public interface AuthenticationListener {
- void onPeerAddressAuthenticated(Address peerAddress, Connection connection);
+ void onPeerAuthenticated(Address peerAddress, Connection connection);
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
index f939db61dd..a41b1507e2 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/MaintenanceManager.java
@@ -56,7 +56,6 @@ public class MaintenanceManager implements MessageListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof MaintenanceMessage) {
Log.traceCall(message.toString());
- log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) {
SettableFuture future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
Futures.addCallback(future, new FutureCallback() {
@@ -68,19 +67,19 @@ public class MaintenanceManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage());
- peerGroup.removePeer(connection.getPeerAddress());
+ connection.getPeerAddress().ifPresent(peerAddress -> peerGroup.removePeer(peerAddress));
}
});
} else if (message instanceof PongMessage) {
- if (connection.getPeerAddress() != null) {
- Peer peer = peerGroup.getAuthenticatedPeers().get(connection.getPeerAddress());
+ connection.getPeerAddress().ifPresent(peerAddress -> {
+ Peer peer = peerGroup.getAuthenticatedPeers().get(peerAddress);
if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) {
- log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + connection.getPeerAddress());
+ log.warn("PongMessage invalid: self/peer " + peerGroup.getMyAddress() + "/" + peerAddress);
peerGroup.removePeer(peer.address);
}
}
- }
+ });
}
}
}
@@ -93,7 +92,7 @@ public class MaintenanceManager implements MessageListener {
sendPingTimer = UserThread.runAfterRandomDelay(() -> {
pingPeers();
startMaintenanceTimer();
- }, 5, 10, TimeUnit.MINUTES);
+ }, 5, 7, TimeUnit.MINUTES);
}
@@ -117,7 +116,7 @@ public class MaintenanceManager implements MessageListener {
peerGroup.removePeer(e.address);
}
});
- }, 1, 10));
+ }, 2, 4, TimeUnit.SECONDS));
}
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java
index 310eb37665..6c33b3bbf3 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java
@@ -14,9 +14,9 @@ public class Peer {
public final Address address;
private final long pingNonce;
- public Peer(Connection connection) {
+ public Peer(Connection connection, Address address) {
this.connection = connection;
- this.address = connection.getPeerAddress();
+ this.address = address;
pingNonce = new Random().nextLong();
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
index 3c80abc4b9..0163108f97 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java
@@ -9,9 +9,9 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
-import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersRequest;
-import io.bitsquare.p2p.peers.messages.peerexchange.GetPeersResponse;
-import io.bitsquare.p2p.peers.messages.peerexchange.PeerExchangeMessage;
+import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
+import io.bitsquare.p2p.peers.messages.peers.GetPeersResponse;
+import io.bitsquare.p2p.peers.messages.peers.PeerExchangeMessage;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +51,6 @@ public class PeerExchangeManager implements MessageListener {
public void onMessage(Message message, Connection connection) {
if (message instanceof PeerExchangeMessage) {
Log.traceCall(message.toString());
- log.debug("Received message " + message + " at " + peerGroup.getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof GetPeersRequest) {
GetPeersRequest getPeersRequestMessage = (GetPeersRequest) message;
HashSet reportedPeers = getPeersRequestMessage.reportedPeers;
@@ -68,7 +67,7 @@ public class PeerExchangeManager implements MessageListener {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("GetPeersResponse sending failed " + throwable.getMessage());
- peerGroup.removePeer(getPeersRequestMessage.address);
+ peerGroup.removePeer(getPeersRequestMessage.senderAddress);
}
});
@@ -90,7 +89,7 @@ public class PeerExchangeManager implements MessageListener {
getPeersTimer = UserThread.runAfterRandomDelay(() -> {
trySendGetPeersRequest();
startGetPeersTimer();
- }, 1, 2, TimeUnit.MINUTES);
+ }, 2, 4, TimeUnit.MINUTES);
}
private void trySendGetPeersRequest() {
@@ -113,7 +112,7 @@ public class PeerExchangeManager implements MessageListener {
peerGroup.removePeer(e.address);
}
});
- }, 5, 10));
+ }, 3, 5, TimeUnit.SECONDS));
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
index b3a700544d..cda413b8d2 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java
@@ -47,7 +47,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
setMaxConnectionsLowPriority(8);
}
- static final int INACTIVITY_PERIOD_BEFORE_PING = 30 * 1000;
+ static final int INACTIVITY_PERIOD_BEFORE_PING = 5 * 60 * 1000;
private static final int MAX_REPORTED_PEERS = 1000;
private final NetworkNode networkNode;
@@ -95,7 +95,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
- removePeer(connection.getPeerAddress());
+ connection.getPeerAddress().ifPresent(peerAddress -> removePeer(peerAddress));
}
@Override
@@ -127,23 +127,30 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender))
.forEach(peer -> UserThread.runAfterRandomDelay(() -> {
- final Address address = peer.address;
- log.trace("Broadcast message from " + getMyAddress() + " to " + address + ".");
- SettableFuture future = networkNode.sendMessage(address, message);
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded.");
- }
+ // as we use a delay we need to check again if our peer is still in the authenticated list
+ if (authenticatedPeers.containsValue(peer)) {
+ final Address address = peer.address;
+ log.trace("Broadcast message from " + getMyAddress() + " to " + address + ".");
+ SettableFuture future = networkNode.sendMessage(address, message);
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded.");
+ }
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Broadcast failed. " + throwable.getMessage());
- UserThread.execute(() -> removePeer(address));
- }
- });
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Broadcast failed. " + throwable.getMessage());
+ UserThread.execute(() -> removePeer(address));
+ }
+ });
+ } else {
+ log.debug("Peer is not in our authenticated list anymore. " +
+ "That can happen as we use a delay in the loop for the broadcast. " +
+ "Peer.address={}", peer.address);
+ }
},
- 10, 200, TimeUnit.MILLISECONDS));
+ 10, 100, TimeUnit.MILLISECONDS));
} else {
log.info("Message not broadcasted because we have no authenticated peers yet. " +
"message = {}", message);
@@ -167,7 +174,11 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void processAuthenticationRequest(AuthenticationRequest message, final Connection connection) {
Log.traceCall(message.toString());
- Address peerAddress = message.address;
+ Address peerAddress = message.senderAddress;
+
+ // We set the address to the connection, otherwise we will not find the connection when sending
+ // a reject message and we would create a new outbound connection instead using the inbound.
+ connection.setPeerAddress(message.senderAddress);
if (!authenticatedPeers.containsKey(peerAddress)) {
AuthenticationHandshake authenticationHandshake;
@@ -175,7 +186,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.info("We got an incoming AuthenticationRequest for the peerAddress ({})", peerAddress);
// We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
- authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), message.address);
+ authenticationHandshake = new AuthenticationHandshake(networkNode, this, getMyAddress(), peerAddress);
authenticationHandshakes.put(peerAddress, authenticationHandshake);
doRespondToAuthenticationRequest(message, connection, peerAddress, authenticationHandshake);
} else {
@@ -183,13 +194,13 @@ public class PeerGroup implements MessageListener, ConnectionListener {
"an authentication handshake for that peerAddress ({})", peerAddress);
log.debug("We avoid such race conditions by rejecting the request if the hashCode of our address ({}) is " +
"smaller then the hashCode of the peers address ({}).", getMyAddress().hashCode(),
- message.address.hashCode());
+ message.senderAddress.hashCode());
authenticationHandshake = authenticationHandshakes.get(peerAddress);
- if (getMyAddress().hashCode() < message.address.hashCode()) {
+ if (getMyAddress().hashCode() < peerAddress.hashCode()) {
log.info("We reject the authentication request and keep our own request alive.");
- rejectAuthenticationRequest(message, peerAddress);
+ rejectAuthenticationRequest(peerAddress);
} else {
log.info("We accept the authentication request but cancel our own request.");
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshake);
@@ -201,35 +212,34 @@ public class PeerGroup implements MessageListener, ConnectionListener {
log.warn("We got an incoming AuthenticationRequest but we are already authenticated to that peer " +
"with peerAddress {}.\n" +
"That might happen in some race conditions. We reject the request.", peerAddress);
- rejectAuthenticationRequest(message, peerAddress);
+ rejectAuthenticationRequest(peerAddress);
}
}
private void processAuthenticationRejection(AuthenticationRejection message) {
Log.traceCall(message.toString());
- Address peerAddress = message.address;
+ Address peerAddress = message.senderAddress;
cancelOwnAuthenticationRequest(peerAddress, authenticationHandshakes.get(peerAddress));
}
- private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection, final Address peerAddress, AuthenticationHandshake authenticationHandshake) {
+ private void doRespondToAuthenticationRequest(AuthenticationRequest message, Connection connection,
+ Address peerAddress, AuthenticationHandshake authenticationHandshake) {
Log.traceCall(message.toString());
SettableFuture future = authenticationHandshake.respondToAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- checkArgument(peerAddress.equals(connection.getPeerAddress()), "peerAddress does not match connection.getPeerAddress()");
- log.info("We got the peer who did an authentication request authenticated.");
- addAuthenticatedPeer(connection, peerAddress);
- }
+ @Override
+ public void onSuccess(Connection connection) {
+ log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress);
+ addAuthenticatedPeer(connection, peerAddress);
+ }
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Authentication with peer who requested authentication failed.\n" +
- "That can happen if the peer went offline. " + throwable.getMessage());
- removePeer(peerAddress);
- }
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Authentication with peer who requested authentication failed.\n" +
+ "That can happen if the peer went offline. " + throwable.getMessage());
+ removePeer(peerAddress);
+ }
}
-
);
}
@@ -239,9 +249,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
authenticationHandshakes.remove(peerAddress);
}
- private void rejectAuthenticationRequest(AuthenticationRequest message, Address peerAddress) {
+ private void rejectAuthenticationRequest(Address peerAddress) {
Log.traceCall();
- networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress(), message.requesterNonce));
+ networkNode.sendMessage(peerAddress, new AuthenticationRejection(getMyAddress()));
}
@@ -254,42 +264,42 @@ public class PeerGroup implements MessageListener, ConnectionListener {
seedNodeAddressesOptional = Optional.of(seedNodeAddresses);
remainingSeedNodes.addAll(seedNodeAddresses);
remainingSeedNodes.remove(peerAddress);
-
authenticateToFirstSeedNode(peerAddress);
}
private void authenticateToFirstSeedNode(Address peerAddress) {
Log.traceCall();
if (!maxConnectionsForAuthReached()) {
- if (remainingSeedNodesAvailable()) {
- log.info("We try to authenticate to seed node {}.", peerAddress);
- authenticate(peerAddress, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.info("We got our first seed node authenticated. " +
- "We try if there are reported peers available to authenticate.");
- addAuthenticatedPeer(connection, peerAddress);
- authenticateToRemainingReportedPeer();
- }
+ log.info("We try to authenticate to seed node {}.", peerAddress);
+ authenticate(peerAddress, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.info("We got our first seed node authenticated. " +
+ "We try if there are reported peers available to authenticate.");
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Authentication to " + peerAddress + " failed." +
- "\nThat is expected if seed nodes are offline." +
- "\nException:" + throwable.getMessage());
+ addAuthenticatedPeer(connection, peerAddress);
+ authenticateToRemainingReportedPeer();
+ }
- removePeer(peerAddress);
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Authentication to " + peerAddress + " failed." +
+ "\nThat is expected if seed nodes are offline." +
+ "\nException:" + throwable.getMessage());
+ removePeer(peerAddress);
+
+ if (remainingSeedNodesAvailable()) {
log.info("We try another random seed node for first authentication attempt.");
authenticateToFirstSeedNode(getAndRemoveRandomAddress(remainingSeedNodes));
+ } else {
+ log.info("There are no seed nodes available for authentication. " +
+ "We try if there are reported peers available to authenticate.");
+ authenticateToRemainingReportedPeer();
}
- });
- } else {
- log.info("There are no seed nodes available for authentication. " +
- "We try if there are reported peers available to authenticate.");
- authenticateToRemainingReportedPeer();
- }
+ }
+ });
} else {
log.info("We have already enough connections.");
}
@@ -330,7 +340,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
} else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
- 1, 2, TimeUnit.MINUTES);
+ 10, 20, TimeUnit.SECONDS);
}
} else {
log.info("We have already enough connections.");
@@ -346,38 +356,43 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Log.traceCall();
if (!maxConnectionsForAuthReached()) {
if (reportedPeersAvailable()) {
- Address peerAddress = getAndRemoveRandomReportedPeer(new ArrayList<>(reportedPeers)).address;
- removeFromReportedPeers(peerAddress);
+ if (getAndRemoveNotAuthenticatingReportedPeer().isPresent()) {
+ Address peerAddress = getAndRemoveNotAuthenticatingReportedPeer().get().address;
+ removeFromReportedPeers(peerAddress);
+ log.info("We try to authenticate to peer {}.", peerAddress);
+ authenticate(peerAddress, new FutureCallback() {
+ @Override
+ public void onSuccess(Connection connection) {
+ log.info("We got a peer authenticated. " +
+ "We try if there are more reported peers available to authenticate.");
- log.info("We try to authenticate to peer {}.", peerAddress);
- authenticate(peerAddress, new FutureCallback() {
- @Override
- public void onSuccess(Connection connection) {
- log.info("We got a peer authenticated. " +
- "We try if there are more reported peers available to authenticate.");
+ addAuthenticatedPeer(connection, peerAddress);
+ authenticateToRemainingReportedPeer();
+ }
- addAuthenticatedPeer(connection, peerAddress);
- authenticateToRemainingReportedPeer();
- }
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Authentication to " + peerAddress + " failed." +
+ "\nThat is expected if the peer is offline." +
+ "\nException:" + throwable.getMessage());
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Authentication to " + peerAddress + " failed." +
- "\nThat is expected if the peer is offline." +
- "\nException:" + throwable.getMessage());
+ removePeer(peerAddress);
- removePeer(peerAddress);
-
- log.info("We try another random seed node for authentication.");
- authenticateToRemainingReportedPeer();
- }
- });
+ log.info("We try another random seed node for authentication.");
+ authenticateToRemainingReportedPeer();
+ }
+ });
+ } else {
+ log.info("We don't have a reported peers available (maybe one is authenticating already). We will try again after a random pause.");
+ UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
+ 10, 20, TimeUnit.SECONDS);
+ }
} else if (remainingSeedNodesAvailable()) {
authenticateToRemainingSeedNode();
} else {
log.info("We don't have seed nodes or reported peers available. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
- 1, 2, TimeUnit.MINUTES);
+ 30, 40, TimeUnit.SECONDS);
}
} else {
log.info("We have already enough connections.");
@@ -442,6 +457,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
private void addAuthenticatedPeer(Connection connection, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress());
+
+ connection.setPeerAddress(peerAddress);
+ connection.setAuthenticated();
+
if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress);
@@ -452,15 +471,14 @@ public class PeerGroup implements MessageListener, ConnectionListener {
+ "\npeerAddress= " + peerAddress
+ "\n############################################################\n");
- authenticatedPeers.put(peerAddress, new Peer(connection));
+ authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress));
removeFromReportedPeers(peerAddress);
if (!checkIfConnectedPeersExceeds())
printAuthenticatedPeers();
- connection.setAuthenticated(peerAddress); //TODO check if address is set already
- authenticationListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
+ authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
}
void removePeer(@Nullable Address peerAddress) {
@@ -524,6 +542,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
if (size > PeerGroup.MAX_CONNECTIONS_HIGH_PRIORITY) {
authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated())
+ .filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
.collect(Collectors.toList());
}
}
@@ -536,10 +555,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
Connection connection = authenticatedConnections.remove(0);
log.info("We are going to shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
- connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS));
+ connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 10, 50, TimeUnit.MILLISECONDS));
return true;
} else {
- log.warn("authenticatedConnections.size() == 0. That must never happen here. (checkIfConnectedPeersExceeds)");
+ log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)");
return false;
}
} else {
@@ -564,13 +583,17 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return all;
}
+ public boolean isInAuthenticationProcess(Address address) {
+ return authenticationHandshakes.containsKey(address);
+ }
+
///////////////////////////////////////////////////////////////////////////////////////////
// Reported peers
///////////////////////////////////////////////////////////////////////////////////////////
void addToReportedPeers(HashSet reportedPeersToAdd, Connection connection) {
- Log.traceCall();
+ Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd);
// we disconnect misbehaving nodes trying to send too many peers
// reported peers include the authenticated peers which is normally max. 8 but we give some headroom
// for safety
@@ -640,6 +663,18 @@ public class PeerGroup implements MessageListener, ConnectionListener {
return list.remove(new Random().nextInt(list.size()));
}
+ private Optional getAndRemoveNotAuthenticatingReportedPeer() {
+ Optional reportedPeer = Optional.empty();
+ List list = new ArrayList<>(reportedPeers);
+ if (!list.isEmpty()) {
+ do {
+ reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list));
+ }
+ while (!list.isEmpty() && authenticationHandshakes.containsKey(reportedPeer.get().address));
+ }
+ return reportedPeer;
+ }
+
private Address getAndRemoveRandomAddress(List list) {
checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size()));
@@ -666,5 +701,4 @@ public class PeerGroup implements MessageListener, ConnectionListener {
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
-
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
index 9f47bfe321..89f140f50d 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java
@@ -8,12 +8,13 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.Connection;
+import io.bitsquare.p2p.network.ConnectionPriority;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
+import io.bitsquare.p2p.peers.messages.data.DataRequest;
+import io.bitsquare.p2p.peers.messages.data.DataResponse;
import io.bitsquare.p2p.storage.P2PDataStorage;
import io.bitsquare.p2p.storage.data.ProtectedData;
-import io.bitsquare.p2p.storage.messages.GetDataRequest;
-import io.bitsquare.p2p.storage.messages.GetDataResponse;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -40,6 +41,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private final NetworkNode networkNode;
private final P2PDataStorage dataStorage;
+ private final PeerGroup peerGroup;
private final Listener listener;
private Optional optionalConnectedSeedNodeAddress = Optional.empty();
@@ -50,9 +52,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
- public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) {
+ public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerGroup peerGroup, Listener listener) {
this.networkNode = networkNode;
this.dataStorage = dataStorage;
+ this.peerGroup = peerGroup;
this.listener = listener;
networkNode.addMessageListener(this);
@@ -70,39 +73,46 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
if (!seedNodeAddresses.isEmpty()) {
List remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingSeedNodeAddresses);
- Address candidate = remainingSeedNodeAddresses.remove(0);
+ Address candidate = remainingSeedNodeAddresses.get(0);
+ if (!peerGroup.isInAuthenticationProcess(candidate)) {
+ // We only remove it if it is not in the process of authentication
+ remainingSeedNodeAddresses.remove(0);
+ log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
- log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
+ SettableFuture future = networkNode.sendMessage(candidate, new DataRequest());
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable Connection connection) {
+ log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
+ checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen.");
+ optionalConnectedSeedNodeAddress = Optional.of(candidate);
+ }
- SettableFuture future = networkNode.sendMessage(candidate, new GetDataRequest());
- Futures.addCallback(future, new FutureCallback() {
- @Override
- public void onSuccess(@Nullable Connection connection) {
- log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
- checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen.");
- optionalConnectedSeedNodeAddress = Optional.of(candidate);
- }
+ @Override
+ public void onFailure(@NotNull Throwable throwable) {
+ log.info("Send GetAllDataMessage to " + candidate + " failed. " +
+ "That is expected if the seed node is offline. " +
+ "Exception:" + throwable.getMessage());
+ if (!remainingSeedNodeAddresses.isEmpty())
+ log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses);
- @Override
- public void onFailure(@NotNull Throwable throwable) {
- log.info("Send GetAllDataMessage to " + candidate + " failed. " +
- "That is expected if the seed node is offline. " +
- "Exception:" + throwable.getMessage());
- if (!remainingSeedNodeAddresses.isEmpty())
- log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses);
-
- requestData(remainingSeedNodeAddresses);
- }
- });
+ requestData(remainingSeedNodeAddresses);
+ }
+ });
+ } else {
+ log.info("The seed node ({}) is in the process of authentication.\n" +
+ "We will try again after a pause of 3-5 sec.", candidate);
+ listener.onNoSeedNodeAvailable();
+ UserThread.runAfterRandomDelay(() -> requestData(remainingSeedNodeAddresses),
+ 3, 5, TimeUnit.SECONDS);
+ }
} else {
log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is online.\n" +
- "We will try again after a pause of 20-30 sec.");
+ "We will try again after a pause of 10-20 sec.");
listener.onNoSeedNodeAvailable();
-
- // We re try after 20-30 sec.
UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()),
- 20, 30, TimeUnit.SECONDS);
+ 10, 20, TimeUnit.SECONDS);
}
}
@@ -113,18 +123,18 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
@Override
public void onMessage(Message message, Connection connection) {
- if (message instanceof GetDataRequest) {
+ if (message instanceof DataRequest) {
// We are a seed node and receive that msg from a new node
Log.traceCall(message.toString());
- networkNode.sendMessage(connection, new GetDataResponse(new HashSet<>(dataStorage.getMap().values())));
- } else if (message instanceof GetDataResponse) {
+ networkNode.sendMessage(connection, new DataResponse(new HashSet<>(dataStorage.getMap().values())));
+ } else if (message instanceof DataResponse) {
// We are the new node which has requested the data
Log.traceCall(message.toString());
- GetDataResponse getDataResponse = (GetDataResponse) message;
- HashSet set = getDataResponse.set;
+ DataResponse dataResponse = (DataResponse) message;
+ HashSet set = dataResponse.set;
// we keep that connection open as the bootstrapping peer will use that for the authentication
// as we are not authenticated yet the data adding will not be broadcasted
- set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
+ connection.getPeerAddress().ifPresent(peerAddress -> set.stream().forEach(e -> dataStorage.add(e, peerAddress)));
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> listener.onDataReceived(connectedSeedNodeAddress));
}
}
@@ -135,10 +145,13 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
///////////////////////////////////////////////////////////////////////////////////////////
@Override
- public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
+ public void onPeerAuthenticated(Address peerAddress, Connection connection) {
optionalConnectedSeedNodeAddress.ifPresent(connectedSeedNodeAddress -> {
- if (connectedSeedNodeAddress.equals(peerAddress))
- requestDataFromAuthenticatedSeedNode(peerAddress, connection);
+ // We only request the data again if we have initiated the authentication (ConnectionPriority.ACTIVE)
+ // We delay a bit to be sure that the authentication state is applied to all threads
+ if (connection.getConnectionPriority() == ConnectionPriority.ACTIVE && connectedSeedNodeAddress.equals(peerAddress))
+ UserThread.runAfter(()
+ -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS);
});
}
@@ -147,7 +160,7 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString());
// We have to request the data again as we might have missed pushed data in the meantime
- SettableFuture future = networkNode.sendMessage(connection, new GetDataRequest());
+ SettableFuture future = networkNode.sendMessage(connection, new DataRequest());
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(@Nullable Connection connection) {
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java
similarity index 59%
rename from network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java
index 3d40fc89a2..41693c1af8 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthRequest.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationChallenge.java
@@ -6,24 +6,26 @@ import io.bitsquare.p2p.peers.ReportedPeer;
import java.util.HashSet;
-public final class GetPeersAuthRequest extends AuthenticationMessage {
+public final class AuthenticationChallenge extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
+ public final long requesterNonce;
public final long responderNonce;
public final HashSet reportedPeers;
- public GetPeersAuthRequest(Address address, long responderNonce, HashSet reportedPeers) {
- super(address);
+ public AuthenticationChallenge(Address senderAddress, long requesterNonce, long responderNonce, HashSet reportedPeers) {
+ super(senderAddress);
+ this.requesterNonce = requesterNonce;
this.responderNonce = responderNonce;
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
- return "GetPeersAuthRequest{" +
- "address=" + address +
- ", challengerNonce=" + responderNonce +
+ return "AuthenticationChallenge{" +
+ ", requesterNonce=" + requesterNonce +
+ ", responderNonce=" + responderNonce +
", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java
index bef49305e5..9a1e9eba9e 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationMessage.java
@@ -7,10 +7,10 @@ import io.bitsquare.p2p.Message;
public abstract class AuthenticationMessage implements Message {
private final int networkId = Version.NETWORK_ID;
- public final Address address;
+ public final Address senderAddress;
- public AuthenticationMessage(Address address) {
- this.address = address;
+ public AuthenticationMessage(Address senderAddress) {
+ this.senderAddress = senderAddress;
}
@Override
@@ -20,7 +20,7 @@ public abstract class AuthenticationMessage implements Message {
@Override
public String toString() {
- return ", address=" + address.toString() +
+ return ", address=" + senderAddress.toString() +
", networkId=" + networkId +
'}';
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
index c810597164..97f2d53237 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRejection.java
@@ -7,18 +7,14 @@ public final class AuthenticationRejection extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
- public final long requesterNonce;
-
- public AuthenticationRejection(Address address, long requesterNonce) {
- super(address);
- this.requesterNonce = requesterNonce;
+ public AuthenticationRejection(Address senderAddress) {
+ super(senderAddress);
}
@Override
public String toString() {
return "AuthenticationReject{" +
- "address=" + address +
- ", requesterNonce=" + requesterNonce +
+ "address=" + senderAddress +
super.toString() + "} ";
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java
index e5744cd4dd..0e78d01135 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationRequest.java
@@ -9,16 +9,16 @@ public final class AuthenticationRequest extends AuthenticationMessage {
public final long requesterNonce;
- public AuthenticationRequest(Address address, long requesterNonce) {
- super(address);
+ public AuthenticationRequest(Address senderAddress, long requesterNonce) {
+ super(senderAddress);
this.requesterNonce = requesterNonce;
}
@Override
public String toString() {
return "AuthenticationRequest{" +
- "address=" + address +
- ", nonce=" + requesterNonce +
+ "senderAddress=" + senderAddress +
+ ", requesterNonce=" + requesterNonce +
super.toString() + "} ";
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java
index 15bd2218d8..966beb6f49 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/AuthenticationResponse.java
@@ -2,26 +2,29 @@ package io.bitsquare.p2p.peers.messages.auth;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
+import io.bitsquare.p2p.peers.ReportedPeer;
+
+import java.util.HashSet;
public final class AuthenticationResponse extends AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
- public final long requesterNonce;
public final long responderNonce;
+ public final HashSet reportedPeers;
- public AuthenticationResponse(Address address, long requesterNonce, long responderNonce) {
- super(address);
- this.requesterNonce = requesterNonce;
+ public AuthenticationResponse(Address senderAddress, long responderNonce, HashSet reportedPeers) {
+ super(senderAddress);
this.responderNonce = responderNonce;
+ this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "AuthenticationResponse{" +
- "address=" + address +
- ", requesterNonce=" + requesterNonce +
- ", challengerNonce=" + responderNonce +
+ "address=" + senderAddress +
+ ", responderNonce=" + responderNonce +
+ ", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java
deleted file mode 100644
index 58a7e4c7a0..0000000000
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/auth/GetPeersAuthResponse.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package io.bitsquare.p2p.peers.messages.auth;
-
-import io.bitsquare.app.Version;
-import io.bitsquare.p2p.Address;
-import io.bitsquare.p2p.peers.ReportedPeer;
-
-import java.util.HashSet;
-
-public final class GetPeersAuthResponse extends AuthenticationMessage {
- // That object is sent over the wire, so we need to take care of version compatibility.
- private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
-
- public final HashSet reportedPeers;
-
- public GetPeersAuthResponse(Address address, HashSet reportedPeers) {
- super(address);
- this.reportedPeers = reportedPeers;
- }
-
- @Override
- public String toString() {
- return "GetPeersAuthResponse{" +
- "address=" + address +
- ", reportedPeers=" + reportedPeers +
- super.toString() + "} ";
- }
-}
diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java
similarity index 80%
rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java
index f70432c1f9..9dd5ef2386 100644
--- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataRequest.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataRequest.java
@@ -1,15 +1,15 @@
-package io.bitsquare.p2p.storage.messages;
+package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
-public final class GetDataRequest implements Message {
+public final class DataRequest implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID;
-
- public GetDataRequest() {
+
+ public DataRequest() {
}
@Override
diff --git a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java
similarity index 78%
rename from network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java
index 0380c5c92a..4e6b007d94 100644
--- a/network/src/main/java/io/bitsquare/p2p/storage/messages/GetDataResponse.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/data/DataResponse.java
@@ -1,4 +1,4 @@
-package io.bitsquare.p2p.storage.messages;
+package io.bitsquare.p2p.peers.messages.data;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
@@ -6,14 +6,14 @@ import io.bitsquare.p2p.storage.data.ProtectedData;
import java.util.HashSet;
-public final class GetDataResponse implements Message {
+public final class DataResponse implements Message {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.NETWORK_ID;
public final HashSet set;
- public GetDataResponse(HashSet set) {
+ public DataResponse(HashSet set) {
this.set = set;
}
@@ -25,9 +25,9 @@ public final class GetDataResponse implements Message {
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (!(o instanceof GetDataResponse)) return false;
+ if (!(o instanceof DataResponse)) return false;
- GetDataResponse that = (GetDataResponse) o;
+ DataResponse that = (DataResponse) o;
return !(set != null ? !set.equals(that.set) : that.set != null);
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
similarity index 71%
rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
index 9d9dba3158..08d754da4d 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersRequest.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java
@@ -1,4 +1,4 @@
-package io.bitsquare.p2p.peers.messages.peerexchange;
+package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Address;
@@ -10,18 +10,18 @@ public final class GetPeersRequest extends PeerExchangeMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
- public final Address address;
+ public final Address senderAddress;
public final HashSet reportedPeers;
- public GetPeersRequest(Address address, HashSet reportedPeers) {
- this.address = address;
+ public GetPeersRequest(Address senderAddress, HashSet reportedPeers) {
+ this.senderAddress = senderAddress;
this.reportedPeers = reportedPeers;
}
@Override
public String toString() {
return "GetPeersRequest{" +
- "address=" + address +
+ "senderAddress=" + senderAddress +
", reportedPeers=" + reportedPeers +
super.toString() + "} ";
}
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
similarity index 92%
rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
index fda16b5a73..bd5d3e5bca 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/GetPeersResponse.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersResponse.java
@@ -1,4 +1,4 @@
-package io.bitsquare.p2p.peers.messages.peerexchange;
+package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.peers.ReportedPeer;
diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java
similarity index 87%
rename from network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java
rename to network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java
index fba7f2bb35..d0e8de7449 100644
--- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peerexchange/PeerExchangeMessage.java
+++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/PeerExchangeMessage.java
@@ -1,4 +1,4 @@
-package io.bitsquare.p2p.peers.messages.peerexchange;
+package io.bitsquare.p2p.peers.messages.peers;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
index 65e69dd4b1..099fff53cb 100644
--- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
+++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
@@ -116,17 +116,18 @@ public class P2PDataStorage implements MessageListener {
Log.traceCall(message.toString());
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
- if (message instanceof AddDataMessage) {
- add(((AddDataMessage) message).data, connection.getPeerAddress());
- } else if (message instanceof RemoveDataMessage) {
- remove(((RemoveDataMessage) message).data, connection.getPeerAddress());
- } else if (message instanceof RemoveMailboxDataMessage) {
- removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress());
- }
+ connection.getPeerAddress().ifPresent(peerAddress -> {
+ if (message instanceof AddDataMessage) {
+ add(((AddDataMessage) message).data, peerAddress);
+ } else if (message instanceof RemoveDataMessage) {
+ remove(((RemoveDataMessage) message).data, peerAddress);
+ } else if (message instanceof RemoveMailboxDataMessage) {
+ removeMailboxData(((RemoveMailboxDataMessage) message).data, peerAddress);
+ }
+ });
} else {
log.warn("Connection is not authenticated yet. " +
- "We don't accept storage operations from non-authenticated nodes.");
- log.trace("Connection = " + connection);
+ "We don't accept storage operations from non-authenticated nodes. connection=", connection);
connection.reportIllegalRequest(IllegalRequest.NotAuthenticated);
}
}