map to user thread

This commit is contained in:
Manfred Karrer 2015-11-04 03:33:16 +01:00
parent 8e74890a13
commit 1a66d3cef5
17 changed files with 574 additions and 591 deletions

View file

@ -1,9 +1,13 @@
package io.bitsquare.p2p.seed; package io.bitsquare.p2p.seed;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.bitsquare.common.UserThread;
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jce.provider.BouncyCastleProvider;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.Security; import java.security.Security;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class SeedNodeMain { public class SeedNodeMain {
@ -13,6 +17,12 @@ public class SeedNodeMain {
public static void main(String[] args) throws NoSuchAlgorithmException { public static void main(String[] args) throws NoSuchAlgorithmException {
Security.addProvider(new BouncyCastleProvider()); Security.addProvider(new BouncyCastleProvider());
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("SeedNodeMain")
.setDaemon(true)
.build();
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));
SeedNode seedNode = new SeedNode(); SeedNode seedNode = new SeedNode();
seedNode.processArgs(args); seedNode.processArgs(args);
seedNode.createAndStartP2PService(); seedNode.createAndStartP2PService();

View file

@ -20,6 +20,7 @@ package io.bitsquare.trade.offer;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.bitsquare.btc.TradeWalletService; import io.bitsquare.btc.TradeWalletService;
import io.bitsquare.btc.WalletService; import io.bitsquare.btc.WalletService;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing; import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler; import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler; import io.bitsquare.common.handlers.ResultHandler;
@ -167,8 +168,8 @@ public class OpenOfferManager {
TimerTask timerTask = new TimerTask() { TimerTask timerTask = new TimerTask() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("RepublishOffers-" + String.valueOf(new Random().nextInt(1000))); Thread.currentThread().setName("RepublishOffers-" + new Random().nextInt(1000));
rePublishOffers(); UserThread.execute(() -> rePublishOffers());
try { try {
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();

View file

@ -16,7 +16,7 @@ import io.bitsquare.crypto.EncryptionService;
import io.bitsquare.crypto.SealedAndSignedMessage; import io.bitsquare.crypto.SealedAndSignedMessage;
import io.bitsquare.p2p.messaging.*; import io.bitsquare.p2p.messaging.*;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.routing.Neighbor; import io.bitsquare.p2p.routing.Peer;
import io.bitsquare.p2p.routing.Routing; import io.bitsquare.p2p.routing.Routing;
import io.bitsquare.p2p.routing.RoutingListener; import io.bitsquare.p2p.routing.RoutingListener;
import io.bitsquare.p2p.seed.SeedNodesRepository; import io.bitsquare.p2p.seed.SeedNodesRepository;
@ -67,7 +67,7 @@ public class P2PService {
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new ConcurrentHashMap<>(); private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new ConcurrentHashMap<>();
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
private List<Address> seedNodeAddresses; private List<Address> seedNodeAddresses;
private List<Address> connectedSeedNodes = new CopyOnWriteArrayList<>(); private Set<Address> connectedSeedNodes = new HashSet<>();
private Set<Address> authenticatedPeerAddresses = new HashSet<>(); private Set<Address> authenticatedPeerAddresses = new HashSet<>();
private boolean authenticatedToFirstPeer; private boolean authenticatedToFirstPeer;
private boolean allDataReceived; private boolean allDataReceived;
@ -245,17 +245,17 @@ public class P2PService {
routing.addRoutingListener(new RoutingListener() { routing.addRoutingListener(new RoutingListener() {
@Override @Override
public void onFirstNeighborAdded(Neighbor neighbor) { public void onFirstPeerAdded(Peer peer) {
log.trace("onFirstNeighbor " + neighbor.toString()); log.trace("onFirstPeer " + peer.toString());
} }
@Override @Override
public void onNeighborAdded(Neighbor neighbor) { public void onPeerAdded(Peer peer) {
} }
@Override @Override
public void onNeighborRemoved(Address address) { public void onPeerRemoved(Address address) {
} }
@ -599,8 +599,9 @@ public class P2PService {
sendGetAllDataMessageTimer.schedule(new TimerTask() { sendGetAllDataMessageTimer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("SendGetAllDataMessageTimer-" + new Random().nextInt(1000));
try { try {
sendGetAllDataMessage(remainingSeedNodeAddresses); UserThread.execute(() -> sendGetAllDataMessage(remainingSeedNodeAddresses));
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());

View file

@ -3,6 +3,7 @@ package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.common.ByteArrayUtils;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.Utils; import io.bitsquare.p2p.Utils;
@ -21,41 +22,42 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.*; import java.util.concurrent.*;
/**
* Connection is created by the server thread or by send message from NetworkNode.
* All handlers are called on User thread.
* Shared data between InputHandler thread and that
*/
public class Connection { public class Connection {
private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
private static final int MAX_ILLEGAL_REQUESTS = 5; private static final int MAX_ILLEGAL_REQUESTS = 5;
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min. private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
private InputHandler inputHandler;
public static int getMaxMsgSize() { public static int getMaxMsgSize() {
return MAX_MSG_SIZE; return MAX_MSG_SIZE;
} }
private final Socket socket;
private final int port; private final int port;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final String uid; private final String uid;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final Map<IllegalRequest, Integer> illegalRequests = new ConcurrentHashMap<>(); // set in init
private final ExecutorService executorService; private ObjectOutputStream objectOutputStream;
private ObjectOutputStream out; // holder of state shared between InputHandler and Connection
private ObjectInputStream in; private SharedSpace sharedSpace;
// mutable data, set from other threads but not changed internally.
@Nullable @Nullable
private Address peerAddress; private Address peerAddress;
private boolean isAuthenticated;
private volatile boolean stopped; private volatile boolean stopped;
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
private volatile boolean inputHandlerStopped;
private volatile Date lastActivityDate;
//TODO got java.util.zip.DataFormatException: invalid distance too far back //TODO got java.util.zip.DataFormatException: invalid distance too far back
// java.util.zip.DataFormatException: invalid literal/lengths set // java.util.zip.DataFormatException: invalid literal/lengths set
// use GZIPInputStream but problems with blocking // use GZIPInputStream but problems with blocking
private boolean useCompression = false; boolean useCompression = false;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -63,24 +65,14 @@ public class Connection {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
this.socket = socket;
port = socket.getLocalPort(); port = socket.getLocalPort();
this.messageListener = messageListener;
this.connectionListener = connectionListener;
uid = UUID.randomUUID().toString(); uid = UUID.randomUUID().toString();
final ThreadFactory threadFactory = new ThreadFactoryBuilder() init(socket, messageListener, connectionListener);
.setNameFormat("Connection-" + socket.getLocalPort())
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
init();
} }
private void init() { private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression);
try { try {
socket.setSoTimeout(SOCKET_TIMEOUT); socket.setSoTimeout(SOCKET_TIMEOUT);
// Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block // Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block
@ -88,35 +80,34 @@ public class Connection {
// When you construct an ObjectInputStream, in the constructor the class attempts to read a header that // When you construct an ObjectInputStream, in the constructor the class attempts to read a header that
// the associated ObjectOutputStream on the other end of the connection has written. // the associated ObjectOutputStream on the other end of the connection has written.
// It will not return until that header has been read. // It will not return until that header has been read.
if (useCompression) { objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
out = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
in = new ObjectInputStream(socket.getInputStream());
} else { // We create a thread for handling inputStream data
out = new ObjectOutputStream(socket.getOutputStream()); inputHandler = new InputHandler(sharedSpace, objectInputStream, port);
in = new ObjectInputStream(socket.getInputStream()); executorService.submit(inputHandler);
}
executorService.submit(new InputHandler());
} catch (IOException e) { } catch (IOException e) {
handleConnectionException(e); sharedSpace.handleConnectionException(e);
} }
lastActivityDate = new Date(); sharedSpace.updateLastActivityDate();
log.trace("\nNew connection created " + this.toString()); log.trace("\nNew connection created " + this.toString());
connectionListener.onConnection(this); UserThread.execute(() -> connectionListener.onConnection(this));
} }
public void onAuthenticationComplete(Address peerAddress, Connection connection) {
isAuthenticated = true; ///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public synchronized void setAuthenticated(Address peerAddress, Connection connection) {
this.peerAddress = peerAddress; this.peerAddress = peerAddress;
connectionListener.onPeerAddressAuthenticated(peerAddress, connection); UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection));
}
public boolean isStopped() {
return stopped;
} }
public void sendMessage(Message message) { public void sendMessage(Message message) {
// That method we get called form user thread
if (!stopped) { if (!stopped) {
try { try {
log.trace("writeObject " + message + " on connection with port " + port); log.trace("writeObject " + message + " on connection with port " + port);
@ -132,28 +123,22 @@ public class Connection {
// log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length); // log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length);
objectToWrite = message; objectToWrite = message;
} }
out.writeObject(objectToWrite); objectOutputStream.writeObject(objectToWrite);
out.flush(); objectOutputStream.flush();
lastActivityDate = new Date(); sharedSpace.updateLastActivityDate();
} }
} catch (IOException e) { } catch (IOException e) {
handleConnectionException(e); // an exception lead to a shutdown
sharedSpace.handleConnectionException(e);
} }
} else { } else {
connectionListener.onDisconnect(ConnectionListener.Reason.ALREADY_CLOSED, Connection.this); UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.ALREADY_CLOSED, this));
} }
} }
public void reportIllegalRequest(IllegalRequest illegalRequest) { public void reportIllegalRequest(IllegalRequest illegalRequest) {
log.warn("We got reported an illegal request " + illegalRequest); sharedSpace.reportIllegalRequest(illegalRequest);
int prevCounter = illegalRequests.get(illegalRequest);
if (prevCounter > illegalRequest.limit) {
log.warn("We close connection as we received too many illegal requests.\n" + illegalRequests.toString());
shutDown();
} else {
illegalRequests.put(illegalRequest, ++prevCounter);
}
} }
@ -162,22 +147,26 @@ public class Connection {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Nullable @Nullable
public Address getPeerAddress() { public synchronized Address getPeerAddress() {
return peerAddress; return peerAddress;
} }
public Date getLastActivityDate() { public Date getLastActivityDate() {
return lastActivityDate; return sharedSpace.getLastActivityDate();
} }
public boolean isAuthenticated() { public synchronized boolean isAuthenticated() {
return isAuthenticated; return peerAddress != null;
} }
public String getUid() { public String getUid() {
return uid; return uid;
} }
public boolean isStopped() {
return stopped;
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// ShutDown // ShutDown
@ -191,7 +180,7 @@ public class Connection {
shutDown(true, null); shutDown(true, null);
} }
private void shutDown(boolean sendCloseConnectionMessage) { void shutDown(boolean sendCloseConnectionMessage) {
shutDown(sendCloseConnectionMessage, null); shutDown(sendCloseConnectionMessage, null);
} }
@ -201,17 +190,18 @@ public class Connection {
+ "\npeerAddress=" + peerAddress + "\npeerAddress=" + peerAddress
+ "\nobjectId=" + getObjectId() + "\nobjectId=" + getObjectId()
+ "\nuid=" + getUid() + "\nuid=" + getUid()
+ "\nisAuthenticated=" + isAuthenticated + "\nisAuthenticated=" + isAuthenticated()
+ "\nsocket.getPort()=" + socket.getPort() + "\nsocket.getPort()=" + sharedSpace.getSocket().getPort()
+ "\n\n"); + "\n\n");
log.debug("ShutDown " + this.getObjectId()); log.debug("ShutDown " + this.getObjectId());
log.debug("ShutDown connection requested. Connection=" + this.toString()); log.debug("ShutDown connection requested. Connection=" + this.toString());
if (!stopped) { if (!stopped) {
stopped = true; stopped = true;
inputHandler.stop();
shutDownInProgress = true; shutDownInProgress = true;
inputHandlerStopped = true; UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(ConnectionListener.Reason.SHUT_DOWN, this));
connectionListener.onDisconnect(ConnectionListener.Reason.SHUT_DOWN, Connection.this);
if (sendCloseConnectionMessage) { if (sendCloseConnectionMessage) {
sendMessage(new CloseConnectionMessage()); sendMessage(new CloseConnectionMessage());
@ -220,7 +210,7 @@ public class Connection {
} }
try { try {
socket.close(); sharedSpace.getSocket().close();
} catch (SocketException e) { } catch (SocketException e) {
log.trace("SocketException at shutdown might be expected " + e.getMessage()); log.trace("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) { } catch (IOException e) {
@ -238,24 +228,6 @@ public class Connection {
} }
} }
private void handleConnectionException(Exception e) {
if (e instanceof SocketException) {
if (socket.isClosed())
connectionListener.onDisconnect(ConnectionListener.Reason.SOCKET_CLOSED, Connection.this);
else
connectionListener.onDisconnect(ConnectionListener.Reason.RESET, Connection.this);
} else if (e instanceof SocketTimeoutException) {
connectionListener.onDisconnect(ConnectionListener.Reason.TIMEOUT, Connection.this);
} else if (e instanceof EOFException) {
connectionListener.onDisconnect(ConnectionListener.Reason.PEER_DISCONNECTED, Connection.this);
} else {
log.info("Exception at connection with port " + socket.getLocalPort());
e.printStackTrace();
connectionListener.onDisconnect(ConnectionListener.Reason.UNKNOWN, Connection.this);
}
shutDown(false);
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
@ -265,8 +237,7 @@ public class Connection {
Connection that = (Connection) o; Connection that = (Connection) o;
if (port != that.port) return false; if (port != that.port) return false;
if (uid != null ? !uid.equals(that.uid) : that.uid != null) return false; return !(uid != null ? !uid.equals(that.uid) : that.uid != null);
return !(peerAddress != null ? !peerAddress.equals(that.peerAddress) : that.peerAddress != null);
} }
@ -274,21 +245,20 @@ public class Connection {
public int hashCode() { public int hashCode() {
int result = port; int result = port;
result = 31 * result + (uid != null ? uid.hashCode() : 0); result = 31 * result + (uid != null ? uid.hashCode() : 0);
result = 31 * result + (peerAddress != null ? peerAddress.hashCode() : 0);
return result; return result;
} }
@Override @Override
public String toString() { public String toString() {
return "Connection{" + return "Connection{" +
"objectId=" + getObjectId() + "port=" + port +
", uid=" + uid + ", uid='" + uid + '\'' +
", port=" + port + ", objectId='" + getObjectId() + '\'' +
", isAuthenticated=" + isAuthenticated + ", sharedSpace=" + sharedSpace.toString() +
", peerAddress=" + peerAddress + ", peerAddress=" + peerAddress +
", lastActivityDate=" + lastActivityDate +
", stopped=" + stopped + ", stopped=" + stopped +
", inputHandlerStopped=" + inputHandlerStopped + ", shutDownInProgress=" + shutDownInProgress +
", useCompression=" + useCompression +
'}'; '}';
} }
@ -297,51 +267,179 @@ public class Connection {
} }
///////////////////////////////////////////////////////////////////////////////////////////
// SharedSpace
///////////////////////////////////////////////////////////////////////////////////////////
/**
* Holds all shared data between Connection and InputHandler
*/
private static class SharedSpace {
private static final Logger log = LoggerFactory.getLogger(SharedSpace.class);
private final Connection connection;
private final Socket socket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final boolean useCompression;
private final Map<IllegalRequest, Integer> illegalRequests = new ConcurrentHashMap<>();
// mutable
private Date lastActivityDate;
public SharedSpace(Connection connection, Socket socket, MessageListener messageListener,
ConnectionListener connectionListener, boolean useCompression) {
this.connection = connection;
this.socket = socket;
this.messageListener = messageListener;
this.connectionListener = connectionListener;
this.useCompression = useCompression;
}
public synchronized void updateLastActivityDate() {
lastActivityDate = new Date();
}
public synchronized Date getLastActivityDate() {
return lastActivityDate;
}
public void reportIllegalRequest(IllegalRequest illegalRequest) {
log.warn("We got reported an illegal request " + illegalRequest);
int prevCounter = illegalRequests.get(illegalRequest);
if (prevCounter > illegalRequest.maxTolerance) {
log.warn("We close connection as we received too many illegal requests.\n" + illegalRequests.toString());
connection.shutDown(false);
} else {
illegalRequests.put(illegalRequest, ++prevCounter);
}
}
public void handleConnectionException(Exception e) {
if (e instanceof SocketException) {
if (socket.isClosed())
UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.SOCKET_CLOSED, connection));
else
UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.RESET, connection));
} else if (e instanceof SocketTimeoutException) {
UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.TIMEOUT, connection));
} else if (e instanceof EOFException) {
UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.PEER_DISCONNECTED, connection));
} else {
log.info("Exception at connection with port " + socket.getLocalPort());
e.printStackTrace();
UserThread.execute(() -> connectionListener.onDisconnect(ConnectionListener.Reason.UNKNOWN, connection));
}
connection.shutDown(false);
}
public void onMessage(Message message) {
UserThread.execute(() -> messageListener.onMessage(message, connection));
}
public boolean useCompression() {
return useCompression;
}
public void shutDown(boolean sendCloseConnectionMessage) {
connection.shutDown(sendCloseConnectionMessage);
}
public ConnectionListener getConnectionListener() {
return connectionListener;
}
public Socket getSocket() {
return socket;
}
public String getConnectionId() {
return connection.getObjectId();
}
@Override
public String toString() {
return "SharedSpace{" +
", socket=" + socket +
", useCompression=" + useCompression +
", illegalRequests=" + illegalRequests +
", lastActivityDate=" + lastActivityDate +
'}';
}
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// InputHandler // InputHandler
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private class InputHandler implements Runnable { private static class InputHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
private final SharedSpace sharedSpace;
private final ObjectInputStream objectInputStream;
private final int port;
private final ExecutorService executorService;
private volatile boolean stopped;
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, int port) {
this.sharedSpace = sharedSpace;
this.objectInputStream = objectInputStream;
this.port = port;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("InputHandler-onMessage-" + port)
.setDaemon(true)
.build();
executorService = new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory);
}
public void stop() {
stopped = true;
Utils.shutDownExecutorService(executorService);
}
@Override @Override
public void run() { public void run() {
try { try {
Thread.currentThread().setName("InputHandler-" + socket.getLocalPort()); Thread.currentThread().setName("InputHandler-" + port);
while (!inputHandlerStopped) { while (!stopped && !Thread.currentThread().isInterrupted()) {
try { try {
log.trace("InputHandler waiting for incoming messages connection=" + Connection.this.getObjectId()); log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId());
Object rawInputObject = in.readObject(); Object rawInputObject = objectInputStream.readObject();
log.trace("New data arrived at inputHandler of connection=" + Connection.this.toString() log.trace("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId()
+ " rawInputObject " + rawInputObject); + " rawInputObject " + rawInputObject);
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size <= MAX_MSG_SIZE) { if (size <= getMaxMsgSize()) {
Serializable serializable = null; Serializable serializable = null;
if (useCompression) { if (sharedSpace.useCompression()) {
if (rawInputObject instanceof byte[]) { if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject; byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
size = compressedObjectAsBytes.length; size = compressedObjectAsBytes.length;
//log.trace("Read object compressed data size: " + size); //log.trace("Read object compressed data size: " + size);
serializable = Utils.decompress(compressedObjectAsBytes); serializable = Utils.decompress(compressedObjectAsBytes);
} else { } else {
reportIllegalRequest(IllegalRequest.InvalidDataType); sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
} }
} else { } else {
if (rawInputObject instanceof Serializable) { if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject; serializable = (Serializable) rawInputObject;
} else { } else {
reportIllegalRequest(IllegalRequest.InvalidDataType); sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
} }
} }
//log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length); //log.trace("Read object decompressed data size: " + ByteArrayUtils.objectToByteArray(serializable).length);
// compressed size might be bigger theoretically so we check again after decompression // compressed size might be bigger theoretically so we check again after decompression
if (size <= MAX_MSG_SIZE) { if (size <= getMaxMsgSize()) {
if (serializable instanceof Message) { if (serializable instanceof Message) {
lastActivityDate = new Date(); sharedSpace.updateLastActivityDate();
Message message = (Message) serializable; Message message = (Message) serializable;
if (message instanceof CloseConnectionMessage) { if (message instanceof CloseConnectionMessage) {
inputHandlerStopped = true; stopped = true;
shutDown(false); sharedSpace.shutDown(false);
} else { } else {
Task task = new Task() { Task task = new Task() {
@Override @Override
@ -351,7 +449,7 @@ public class Connection {
}; };
executorService.submit(() -> { executorService.submit(() -> {
try { try {
messageListener.onMessage(message, Connection.this); sharedSpace.onMessage(message);
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
@ -359,19 +457,19 @@ public class Connection {
}); });
} }
} else { } else {
reportIllegalRequest(IllegalRequest.InvalidDataType); sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
} }
} else { } else {
log.error("Received decompressed data exceeds max. msg size."); log.error("Received decompressed data exceeds max. msg size.");
reportIllegalRequest(IllegalRequest.MaxSizeExceeded); sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
} }
} else { } else {
log.error("Received compressed data exceeds max. msg size."); log.error("Received compressed data exceeds max. msg size.");
reportIllegalRequest(IllegalRequest.MaxSizeExceeded); sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
} }
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {
inputHandlerStopped = true; stopped = true;
handleConnectionException(e); sharedSpace.handleConnectionException(e);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -379,5 +477,14 @@ public class Connection {
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
} }
} }
@Override
public String toString() {
return "InputHandler{" +
"sharedSpace=" + sharedSpace +
", port=" + port +
", stopped=" + stopped +
'}';
}
} }
} }

View file

@ -5,9 +5,9 @@ public enum IllegalRequest {
NotAuthenticated(2), NotAuthenticated(2),
InvalidDataType(2); InvalidDataType(2);
public final int limit; public final int maxTolerance;
IllegalRequest(int limit) { IllegalRequest(int maxTolerance) {
this.limit = limit; this.maxTolerance = maxTolerance;
} }
} }

View file

@ -3,6 +3,7 @@ package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.TorNode; import io.nucleo.net.TorNode;
@ -14,9 +15,8 @@ import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.Callable; import java.util.Random;
import java.util.concurrent.Executors; import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
public class LocalhostNetworkNode extends NetworkNode { public class LocalhostNetworkNode extends NetworkNode {
@ -47,7 +47,11 @@ public class LocalhostNetworkNode extends NetworkNode {
public void start(@Nullable SetupListener setupListener) { public void start(@Nullable SetupListener setupListener) {
if (setupListener != null) addSetupListener(setupListener); if (setupListener != null) addSetupListener(setupListener);
executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("NetworkNode-" + port)
.setDaemon(true)
.build();
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory));
//Tor delay simulation //Tor delay simulation
createTorNode(torNode -> { createTorNode(torNode -> {
@ -89,6 +93,7 @@ public class LocalhostNetworkNode extends NetworkNode {
private void createTorNode(final Consumer<TorNode> resultHandler) { private void createTorNode(final Consumer<TorNode> resultHandler) {
Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> { Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> {
Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
if (simulateTorDelayTorNode > 0) if (simulateTorDelayTorNode > 0)
Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(simulateTorDelayTorNode, TimeUnit.MILLISECONDS);
@ -102,7 +107,7 @@ public class LocalhostNetworkNode extends NetworkNode {
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(task); ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(task);
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() { Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) { public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
resultHandler.accept(torNode); UserThread.execute(() -> resultHandler.accept(torNode));
} }
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
@ -113,6 +118,7 @@ public class LocalhostNetworkNode extends NetworkNode {
private void createHiddenService(final Consumer<HiddenServiceDescriptor> resultHandler) { private void createHiddenService(final Consumer<HiddenServiceDescriptor> resultHandler) {
Callable<HiddenServiceDescriptor> task = () -> { Callable<HiddenServiceDescriptor> task = () -> {
Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
if (simulateTorDelayHiddenService > 0) if (simulateTorDelayHiddenService > 0)
Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(simulateTorDelayHiddenService, TimeUnit.MILLISECONDS);
@ -126,7 +132,7 @@ public class LocalhostNetworkNode extends NetworkNode {
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(task); ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(task);
Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() { Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() {
public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) { public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) {
resultHandler.accept(hiddenServiceDescriptor); UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor));
} }
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {

View file

@ -1,6 +1,7 @@
package io.bitsquare.p2p.network; package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -11,10 +12,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.HashSet; import java.util.*;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -24,8 +22,8 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
protected final int port; protected final int port;
private final List<Connection> outBoundConnections = new CopyOnWriteArrayList<>(); private final Set<Connection> outBoundConnections = Collections.synchronizedSet(new HashSet<>());
private final List<Connection> inBoundConnections = new CopyOnWriteArrayList<>(); private final Set<Connection> inBoundConnections = Collections.synchronizedSet(new HashSet<>());
private final List<MessageListener> messageListeners = new CopyOnWriteArrayList<>(); private final List<MessageListener> messageListeners = new CopyOnWriteArrayList<>();
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<>(); private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<>();
protected final List<SetupListener> setupListeners = new CopyOnWriteArrayList<>(); protected final List<SetupListener> setupListeners = new CopyOnWriteArrayList<>();
@ -55,14 +53,11 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
public SettableFuture<Connection> sendMessage(@NotNull Address peerAddress, Message message) { public SettableFuture<Connection> sendMessage(@NotNull Address peerAddress, Message message) {
log.trace("sendMessage message=" + message); log.trace("sendMessage message=" + message);
checkNotNull(peerAddress, "peerAddress must not be null"); checkNotNull(peerAddress, "peerAddress must not be null");
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Callable<Connection> task = () -> { Optional<Connection> outboundConnectionOptional = findOutboundConnection(peerAddress);
try {
Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress);
Optional<Connection> outboundConnectionOptional = getOutboundConnection(peerAddress);
Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null; Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null;
if (connection != null)
log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid());
if (connection != null && connection.isStopped()) { if (connection != null && connection.isStopped()) {
log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid()); log.trace("We have a connection which is already stopped in outBoundConnections. Connection.uid=" + connection.getUid());
@ -71,62 +66,44 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
} }
if (connection == null) { if (connection == null) {
Optional<Connection> inboundConnectionOptional = getInboundConnection(peerAddress); Optional<Connection> inboundConnectionOptional = findInboundConnection(peerAddress);
if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get(); if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get();
if (connection != null) if (connection != null)
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid()); log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
} }
if (connection == null) { if (connection != null) {
return sendMessage(connection, message);
} else {
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Callable<Connection> task = () -> {
Connection newConnection;
try {
Thread.currentThread().setName("Outgoing-connection-to-" + peerAddress);
log.trace("We have not found any connection for that peerAddress. " +
"We will create a new outbound connection.");
try { try {
Socket socket = getSocket(peerAddress); // can take a while when using tor Socket socket = getSocket(peerAddress); // can take a while when using tor
connection = new Connection(socket, newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this);
(message1, connection1) -> NetworkNode.this.onMessage(message1, connection1), outBoundConnections.add(newConnection);
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
NetworkNode.this.onConnection(connection);
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
log.trace("onDisconnect at outgoing connection to peerAddress " + peerAddress);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
NetworkNode.this.onError(throwable);
}
});
if (!outBoundConnections.contains(connection))
outBoundConnections.add(connection);
else
log.error("We have already that connection in our list. That must not happen. "
+ outBoundConnections + " / connection=" + connection);
log.info("\n\nNetworkNode created new outbound connection:" log.info("\n\nNetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress.port + "\npeerAddress=" + peerAddress.port
+ "\nconnection.uid=" + connection.getUid() + "\nconnection.uid=" + newConnection.getUid()
+ "\nmessage=" + message + "\nmessage=" + message
+ "\n\n"); + "\n\n");
} catch (Throwable t) { } catch (Throwable t) {
resultFuture.setException(t); UserThread.execute(() -> resultFuture.setException(t));
return null; return null;
} }
}
connection.sendMessage(message); newConnection.sendMessage(message);
return connection; return newConnection;
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
UserThread.execute(() -> resultFuture.setException(t));
throw t; throw t;
} }
}; };
@ -134,40 +111,30 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
ListenableFuture<Connection> future = executorService.submit(task); ListenableFuture<Connection> future = executorService.submit(task);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
resultFuture.set(connection); UserThread.execute(() -> resultFuture.set(connection));
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(throwable); UserThread.execute(() -> resultFuture.setException(throwable));
} }
}); });
return resultFuture; return resultFuture;
} }
private Optional<Connection> getOutboundConnection(Address peerAddress) {
return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
private Optional<Connection> getInboundConnection(Address peerAddress) {
return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
} }
public SettableFuture<Connection> sendMessage(Connection connection, Message message) { public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> { ListenableFuture<Connection> future = executorService.submit(() -> {
connection.sendMessage(message); connection.sendMessage(message);
return connection; return connection;
}); });
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
resultFuture.set(connection); UserThread.execute(() -> resultFuture.set(connection));
} }
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(throwable); UserThread.execute(() -> resultFuture.setException(throwable));
} }
}); });
return resultFuture; return resultFuture;
@ -191,8 +158,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
getAllConnections().stream().forEach(e -> e.shutDown()); getAllConnections().stream().forEach(e -> e.shutDown());
log.info("NetworkNode shutdown complete"); log.info("NetworkNode shutdown complete");
if (shutDownCompleteHandler != null) new Thread(shutDownCompleteHandler).start(); if (shutDownCompleteHandler != null) UserThread.execute(() -> shutDownCompleteHandler.run());
;
} }
} }
@ -220,7 +186,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override @Override
public void onConnection(Connection connection) { public void onConnection(Connection connection) {
connectionListeners.stream().forEach(e -> e.onConnection(connection)); connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onConnection(connection)));
} }
@Override @Override
@ -228,7 +194,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.trace("onAuthenticationComplete peerAddress=" + peerAddress); log.trace("onAuthenticationComplete peerAddress=" + peerAddress);
log.trace("onAuthenticationComplete connection=" + connection); log.trace("onAuthenticationComplete connection=" + connection);
connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection)); connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onPeerAddressAuthenticated(peerAddress, connection)));
} }
@Override @Override
@ -237,12 +203,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress); log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress);
outBoundConnections.remove(connection); outBoundConnections.remove(connection);
inBoundConnections.remove(connection); inBoundConnections.remove(connection);
connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection)); connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onDisconnect(reason, connection)));
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
connectionListeners.stream().forEach(e -> e.onError(throwable)); connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onError(throwable)));
} }
@ -260,7 +226,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override @Override
public void onMessage(Message message, Connection connection) { public void onMessage(Message message, Connection connection) {
messageListeners.stream().forEach(e -> e.onMessage(message, connection)); messageListeners.stream().forEach(e -> UserThread.execute(() -> e.onMessage(message, connection)));
} }
@ -269,13 +235,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
protected void startServer(ServerSocket serverSocket) { protected void startServer(ServerSocket serverSocket) {
server = new Server(serverSocket, (message, connection) -> { server = new Server(serverSocket,
NetworkNode.this.onMessage(message, connection); (message, connection) -> NetworkNode.this.onMessage(message, connection),
}, new ConnectionListener() { new ConnectionListener() {
@Override @Override
public void onConnection(Connection connection) { public void onConnection(Connection connection) {
// we still have not authenticated so put it to the temp list // we still have not authenticated so put it to the temp list
if (!inBoundConnections.contains(connection))
inBoundConnections.add(connection); inBoundConnections.add(connection);
NetworkNode.this.onConnection(connection); NetworkNode.this.onConnection(connection);
} }
@ -301,6 +266,16 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
executorService.submit(server); executorService.submit(server);
} }
private Optional<Connection> findOutboundConnection(Address peerAddress) {
return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
private Optional<Connection> findInboundConnection(Address peerAddress) {
return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
abstract protected Socket getSocket(Address peerAddress) throws IOException; abstract protected Socket getSocket(Address peerAddress) throws IOException;
@Nullable @Nullable

View file

@ -7,8 +7,8 @@ import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.List; import java.util.HashSet;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.Set;
public class Server implements Runnable { public class Server implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Server.class); private static final Logger log = LoggerFactory.getLogger(Server.class);
@ -16,7 +16,7 @@ public class Server implements Runnable {
private final ServerSocket serverSocket; private final ServerSocket serverSocket;
private final MessageListener messageListener; private final MessageListener messageListener;
private final ConnectionListener connectionListener; private final ConnectionListener connectionListener;
private final List<Connection> connections = new CopyOnWriteArrayList<>(); private final Set<Connection> connections = new HashSet<>();
private volatile boolean stopped; private volatile boolean stopped;
@ -30,10 +30,11 @@ public class Server implements Runnable {
public void run() { public void run() {
try { try {
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort()); Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
while (!stopped) {
try { try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
log.info("Ready to accept new clients on port " + serverSocket.getLocalPort()); log.info("Ready to accept new clients on port " + serverSocket.getLocalPort());
final Socket socket = serverSocket.accept(); final Socket socket = serverSocket.accept();
if (!stopped) {
log.info("Accepted new client on port " + socket.getLocalPort()); log.info("Accepted new client on port " + socket.getLocalPort());
Connection connection = new Connection(socket, messageListener, connectionListener); Connection connection = new Connection(socket, messageListener, connectionListener);
log.info("\n\nServer created new inbound connection:" log.info("\n\nServer created new inbound connection:"
@ -43,12 +44,14 @@ public class Server implements Runnable {
+ "\n\n"); + "\n\n");
log.info("Server created new socket with port " + socket.getPort()); log.info("Server created new socket with port " + socket.getPort());
if (!stopped)
connections.add(connection); connections.add(connection);
}
}
} catch (IOException e) { } catch (IOException e) {
if (!stopped) if (!stopped)
e.printStackTrace(); e.printStackTrace();
} }
}
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
@ -67,8 +70,9 @@ public class Server implements Runnable {
log.warn("SocketException at shutdown might be expected " + e.getMessage()); log.warn("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
log.error("Exception at shutdown. " + e.getMessage());
} finally { } finally {
log.debug("Server shutdown complete"); log.info("Server shutdown complete");
} }
} }
} }

View file

@ -3,10 +3,9 @@ package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager; import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Utils; import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.network.messages.SelfTestMessage;
import io.nucleo.net.HiddenServiceDescriptor; import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.TorNode; import io.nucleo.net.TorNode;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -16,14 +15,10 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random; import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
@ -34,7 +29,6 @@ public class TorNetworkNode extends NetworkNode {
private static final Random random = new Random(); private static final Random random = new Random();
private static final long TIMEOUT = 5000; private static final long TIMEOUT = 5000;
private static final long SELF_TEST_INTERVAL = 10 * 60 * 1000;
private static final int MAX_ERRORS_BEFORE_RESTART = 3; private static final int MAX_ERRORS_BEFORE_RESTART = 3;
private static final int MAX_RESTART_ATTEMPTS = 3; private static final int MAX_RESTART_ATTEMPTS = 3;
private static final int WAIT_BEFORE_RESTART = 2000; private static final int WAIT_BEFORE_RESTART = 2000;
@ -43,21 +37,13 @@ public class TorNetworkNode extends NetworkNode {
private final File torDir; private final File torDir;
private TorNode torNode; private TorNode torNode;
private HiddenServiceDescriptor hiddenServiceDescriptor; private HiddenServiceDescriptor hiddenServiceDescriptor;
private Timer shutDownTimeoutTimer, selfTestTimer, selfTestTimeoutTimer; private Timer shutDownTimeoutTimer;
private TimerTask selfTestTimeoutTask, selfTestTask;
private AtomicBoolean selfTestRunning = new AtomicBoolean(false);
private long nonce; private long nonce;
private int errorCounter; private int errorCounter;
private int restartCounter; private int restartCounter;
private Runnable shutDownCompleteHandler; private Runnable shutDownCompleteHandler;
private boolean torShutDownComplete, networkNodeShutDownDoneComplete; private boolean torShutDownComplete, networkNodeShutDownDoneComplete;
static {
try {
new Socks5Proxy("", 0);
} catch (UnknownHostException e) {
}
}
// ///////////////////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
@ -67,77 +53,6 @@ public class TorNetworkNode extends NetworkNode {
super(port); super(port);
this.torDir = torDir; this.torDir = torDir;
init();
}
private void init() {
selfTestTimeoutTask = new TimerTask() {
@Override
public void run() {
try {
log.error("A timeout occurred at self test");
stopSelfTestTimer();
selfTestFailed();
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
};
selfTestTask = new TimerTask() {
@Override
public void run() {
try {
stopTimeoutTimer();
if (selfTestRunning.get()) {
log.debug("running self test");
selfTestTimeoutTimer = new Timer();
selfTestTimeoutTimer.schedule(selfTestTimeoutTask, TIMEOUT);
// might be interrupted by timeout task
if (selfTestRunning.get()) {
nonce = random.nextLong();
log.trace("send msg with nonce " + nonce);
try {
SettableFuture<Connection> future = sendMessage(new Address(hiddenServiceDescriptor.getFullAddress()), new SelfTestMessage(nonce));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Sending self test message succeeded");
}
@Override
public void onFailure(Throwable throwable) {
log.error("Error at sending self test message. Exception = " + throwable);
stopTimeoutTimer();
throwable.printStackTrace();
selfTestFailed();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
};
addMessageListener((message, connection) -> {
if (message instanceof SelfTestMessage) {
if (((SelfTestMessage) message).nonce == nonce) {
runSelfTest();
} else {
log.error("Nonce not matching our challenge. That should never happen.");
selfTestFailed();
}
}
});
} }
@ -151,13 +66,17 @@ public class TorNetworkNode extends NetworkNode {
addSetupListener(setupListener); addSetupListener(setupListener);
// executorService might have been shutdown before a restart, so we create a new one // executorService might have been shutdown before a restart, so we create a new one
executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("NetworkNode-" + port)
.setDaemon(true)
.build();
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(5, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), threadFactory));
// Create the tor node (takes about 6 sec.) // Create the tor node (takes about 6 sec.)
createTorNode(torDir, torNode -> { createTorNode(torDir, torNode -> {
TorNetworkNode.this.torNode = torNode; TorNetworkNode.this.torNode = torNode;
setupListeners.stream().forEach(e -> e.onTorNodeReady()); setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onTorNodeReady()));
// Create Hidden Service (takes about 40 sec.) // Create Hidden Service (takes about 40 sec.)
createHiddenService(torNode, port, hiddenServiceDescriptor -> { createHiddenService(torNode, port, hiddenServiceDescriptor -> {
@ -166,10 +85,7 @@ public class TorNetworkNode extends NetworkNode {
startServer(hiddenServiceDescriptor.getServerSocket()); startServer(hiddenServiceDescriptor.getServerSocket());
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
setupListeners.stream().forEach(e -> e.onHiddenServiceReady()); setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onHiddenServiceReady()));
// we are ready. so we start our periodic self test if our HS is available
// startSelfTest();
}); });
}); });
} }
@ -188,13 +104,11 @@ public class TorNetworkNode extends NetworkNode {
this.shutDownCompleteHandler = shutDownCompleteHandler; this.shutDownCompleteHandler = shutDownCompleteHandler;
checkNotNull(executorService, "executorService must not be null"); checkNotNull(executorService, "executorService must not be null");
selfTestRunning.set(false);
stopSelfTestTimer();
shutDownTimeoutTimer = new Timer(); shutDownTimeoutTimer = new Timer();
shutDownTimeoutTimer.schedule(new TimerTask() { shutDownTimeoutTimer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("ShutDownTimeoutTimer-" + new Random().nextInt(1000));
log.error("A timeout occurred at shutDown"); log.error("A timeout occurred at shutDown");
shutDownExecutorService(); shutDownExecutorService();
} }
@ -252,14 +166,14 @@ public class TorNetworkNode extends NetworkNode {
@Override @Override
public void onSuccess(Object o) { public void onSuccess(Object o) {
log.info("Shutdown completed"); log.info("Shutdown completed");
new Thread(shutDownCompleteHandler).start(); UserThread.execute(() -> shutDownCompleteHandler.run());
} }
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
throwable.printStackTrace(); throwable.printStackTrace();
log.error("Shutdown executorService failed with exception: " + throwable.getMessage()); log.error("Shutdown executorService failed with exception: " + throwable.getMessage());
new Thread(shutDownCompleteHandler).start(); UserThread.execute(() -> shutDownCompleteHandler.run());
} }
}); });
} }
@ -284,6 +198,7 @@ public class TorNetworkNode extends NetworkNode {
private void createTorNode(final File torDir, final Consumer<TorNode> resultHandler) { private void createTorNode(final File torDir, final Consumer<TorNode> resultHandler) {
Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> { Callable<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> task = () -> {
Thread.currentThread().setName("CreateTorNode-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
if (torDir.mkdirs()) if (torDir.mkdirs())
log.trace("Created directory for tor"); log.trace("Created directory for tor");
@ -314,6 +229,7 @@ public class TorNetworkNode extends NetworkNode {
private void createHiddenService(final TorNode torNode, final int port, private void createHiddenService(final TorNode torNode, final int port,
final Consumer<HiddenServiceDescriptor> resultHandler) { final Consumer<HiddenServiceDescriptor> resultHandler) {
Callable<HiddenServiceDescriptor> task = () -> { Callable<HiddenServiceDescriptor> task = () -> {
Thread.currentThread().setName("CreateHiddenService-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
log.debug("Create hidden service"); log.debug("Create hidden service");
HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port); HiddenServiceDescriptor hiddenServiceDescriptor = torNode.createHiddenService(port);
@ -339,43 +255,6 @@ public class TorNetworkNode extends NetworkNode {
} }
// /////////////////////////////////////////////////////////////////////////////////////////
// Self test
// /////////////////////////////////////////////////////////////////////////////////////////
private void startSelfTest() {
selfTestRunning.set(true);
//addListener(messageListener);
runSelfTest();
}
private void runSelfTest() {
stopSelfTestTimer();
selfTestTimer = new Timer();
selfTestTimer.schedule(selfTestTask, SELF_TEST_INTERVAL);
}
private void stopSelfTestTimer() {
stopTimeoutTimer();
if (selfTestTimer != null)
selfTestTimer.cancel();
}
private void stopTimeoutTimer() {
if (selfTestTimeoutTimer != null)
selfTestTimeoutTimer.cancel();
}
private void selfTestFailed() {
errorCounter++;
log.warn("Self test failed. Already " + errorCounter + " failure(s). Max. errors before restart: "
+ MAX_ERRORS_BEFORE_RESTART);
if (errorCounter >= MAX_ERRORS_BEFORE_RESTART)
restartTor();
else
runSelfTest();
}
@Override @Override
protected Socket getSocket(Address peerAddress) throws IOException { protected Socket getSocket(Address peerAddress) throws IOException {
checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address"); checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address");

View file

@ -4,13 +4,13 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
public abstract class AuthenticationListener implements RoutingListener { public abstract class AuthenticationListener implements RoutingListener {
public void onFirstNeighborAdded(Neighbor neighbor) { public void onFirstPeerAdded(Peer peer) {
} }
public void onNeighborAdded(Neighbor neighbor) { public void onPeerAdded(Peer peer) {
} }
public void onNeighborRemoved(Address address) { public void onPeerRemoved(Address address) {
} }
abstract public void onConnectionAuthenticated(Connection connection); abstract public void onConnectionAuthenticated(Connection connection);

View file

@ -9,14 +9,14 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Random; import java.util.Random;
public class Neighbor implements Serializable { public class Peer implements Serializable {
private static final Logger log = LoggerFactory.getLogger(Neighbor.class); private static final Logger log = LoggerFactory.getLogger(Peer.class);
public final Connection connection; public final Connection connection;
public final Address address; public final Address address;
private long pingNonce; private long pingNonce;
public Neighbor(Connection connection) { public Peer(Connection connection) {
this.connection = connection; this.connection = connection;
this.address = connection.getPeerAddress(); this.address = connection.getPeerAddress();
pingNonce = new Random().nextLong(); pingNonce = new Random().nextLong();
@ -43,16 +43,16 @@ public class Neighbor implements Serializable {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (!(o instanceof Neighbor)) return false; if (!(o instanceof Peer)) return false;
Neighbor neighbor = (Neighbor) o; Peer peer = (Peer) o;
return !(address != null ? !address.equals(neighbor.address) : neighbor.address != null); return !(address != null ? !address.equals(peer.address) : peer.address != null);
} }
@Override @Override
public String toString() { public String toString() {
return "Neighbor{" + return "Peer{" +
"address=" + address + "address=" + address +
", pingNonce=" + pingNonce + ", pingNonce=" + pingNonce +
'}'; '}';

View file

@ -38,8 +38,8 @@ public class Routing {
private final List<Address> seedNodes; private final List<Address> seedNodes;
private final Map<Address, Long> nonceMap = new ConcurrentHashMap<>(); private final Map<Address, Long> nonceMap = new ConcurrentHashMap<>();
private final List<RoutingListener> routingListeners = new CopyOnWriteArrayList<>(); private final List<RoutingListener> routingListeners = new CopyOnWriteArrayList<>();
private final Map<Address, Neighbor> connectedNeighbors = new ConcurrentHashMap<>(); private final Map<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
private final Set<Address> reportedNeighborAddresses = Collections.synchronizedSet(new HashSet<>()); private final Set<Address> reportedPeerAddresses = Collections.synchronizedSet(new HashSet<>());
private final Map<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>(); private final Map<Address, Runnable> authenticationCompleteHandlers = new ConcurrentHashMap<>();
private final Timer maintenanceTimer = new Timer(); private final Timer maintenanceTimer = new Timer();
private final ExecutorService executorService; private final ExecutorService executorService;
@ -87,7 +87,7 @@ public class Routing {
public void onDisconnect(Reason reason, Connection connection) { public void onDisconnect(Reason reason, Connection connection) {
// only removes authenticated nodes // only removes authenticated nodes
if (connection.isAuthenticated()) if (connection.isAuthenticated())
removeNeighbor(connection.getPeerAddress()); removePeer(connection.getPeerAddress());
} }
@Override @Override
@ -116,9 +116,12 @@ public class Routing {
maintenanceTimer.scheduleAtFixedRate(new TimerTask() { maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("RoutingMaintenanceTimer-" + new Random().nextInt(1000));
try { try {
UserThread.execute(() -> {
disconnectOldConnections(); disconnectOldConnections();
pingNeighbors(); pingPeers();
});
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
@ -141,10 +144,10 @@ public class Routing {
} }
} }
private void pingNeighbors() { private void pingPeers() {
log.trace("pingNeighbors"); log.trace("pingPeers");
List<Neighbor> connectedNeighborsList = new ArrayList<>(connectedNeighbors.values()); List<Peer> connectedPeersList = new ArrayList<>(authenticatedPeers.values());
connectedNeighborsList.stream() connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY) .filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
.forEach(e -> { .forEach(e -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce())); SettableFuture<Connection> future = networkNode.sendMessage(e.connection, new PingMessage(e.getPingNonce()));
@ -157,7 +160,7 @@ public class Routing {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("PingMessage sending failed " + throwable.getMessage()); log.info("PingMessage sending failed " + throwable.getMessage());
removeNeighbor(e.address); removePeer(e.address);
} }
}); });
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(new Random().nextInt(5000) + 5000, TimeUnit.MILLISECONDS);
@ -180,25 +183,25 @@ public class Routing {
} }
public void broadcast(BroadcastMessage message, @Nullable Address sender) { public void broadcast(BroadcastMessage message, @Nullable Address sender) {
log.trace("Broadcast message to " + connectedNeighbors.values().size() + " neighbors."); log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers.");
log.trace("message = " + message); log.trace("message = " + message);
printConnectedNeighborsMap(); printConnectedPeersMap();
connectedNeighbors.values().stream() authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender)) .filter(e -> !e.address.equals(sender))
.forEach(neighbor -> { .forEach(peer -> {
log.trace("Broadcast message from " + getAddress() + " to " + neighbor.address + "."); log.trace("Broadcast message from " + getAddress() + " to " + peer.address + ".");
SettableFuture<Connection> future = networkNode.sendMessage(neighbor.address, message); SettableFuture<Connection> future = networkNode.sendMessage(peer.address, message);
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("Broadcast from " + getAddress() + " to " + neighbor.address + " succeeded."); log.trace("Broadcast from " + getAddress() + " to " + peer.address + " succeeded.");
} }
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage()); log.info("Broadcast failed. " + throwable.getMessage());
removeNeighbor(neighbor.address); removePeer(peer.address);
} }
}); });
}); });
@ -220,18 +223,18 @@ public class Routing {
routingListeners.remove(routingListener); routingListeners.remove(routingListener);
} }
public Map<Address, Neighbor> getConnectedNeighbors() { public Map<Address, Peer> getAuthenticatedPeers() {
return connectedNeighbors; return authenticatedPeers;
} }
// Use ArrayList not List as we need it serializable // Use ArrayList not List as we need it serializable
public ArrayList<Address> getAllNeighborAddresses() { public ArrayList<Address> getAllPeerAddresses() {
ArrayList<Address> allNeighborAddresses = new ArrayList<>(reportedNeighborAddresses); ArrayList<Address> allPeerAddresses = new ArrayList<>(reportedPeerAddresses);
allNeighborAddresses.addAll(connectedNeighbors.values().stream() allPeerAddresses.addAll(authenticatedPeers.values().stream()
.map(e -> e.address).collect(Collectors.toList())); .map(e -> e.address).collect(Collectors.toList()));
// remove own address and seed nodes // remove own address and seed nodes
allNeighborAddresses.remove(getAddress()); allPeerAddresses.remove(getAddress());
return allNeighborAddresses; return allPeerAddresses;
} }
@ -244,11 +247,11 @@ public class Routing {
// node1: close connection // node1: close connection
// node1 -> node2 ChallengeMessage on new connection // node1 -> node2 ChallengeMessage on new connection
// node2: authentication to node1 done if nonce ok // node2: authentication to node1 done if nonce ok
// node2 -> node1 GetNeighborsMessage // node2 -> node1 GetPeersMessage
// node1: authentication to node2 done if nonce ok // node1: authentication to node2 done if nonce ok
// node1 -> node2 NeighborsMessage // node1 -> node2 PeersMessage
public void startAuthentication(List<Address> connectedSeedNodes) { public void startAuthentication(Set<Address> connectedSeedNodes) {
connectedSeedNodes.forEach(connectedSeedNode -> { connectedSeedNodes.forEach(connectedSeedNode -> {
executorService.submit(() -> { executorService.submit(() -> {
try { try {
@ -267,7 +270,7 @@ public class Routing {
log.info("We try to authenticate to a random seed node. " + address); log.info("We try to authenticate to a random seed node. " + address);
startAuthTs = System.currentTimeMillis(); startAuthTs = System.currentTimeMillis();
final boolean[] alreadyConnected = {false}; final boolean[] alreadyConnected = {false};
connectedNeighbors.values().stream().forEach(e -> { authenticatedPeers.values().stream().forEach(e -> {
remainingSeedNodes.remove(e.address); remainingSeedNodes.remove(e.address);
if (address.equals(e.address)) if (address.equals(e.address))
alreadyConnected[0] = true; alreadyConnected[0] = true;
@ -362,11 +365,11 @@ public class Routing {
boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress); boolean verified = verifyNonceAndAuthenticatePeerAddress(challengeMessage.requesterNonce, peerAddress);
if (verified) { if (verified) {
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new GetNeighborsMessage(getAddress(), challengeMessage.challengerNonce, getAllNeighborAddresses())); new GetPeersMessage(getAddress(), challengeMessage.challengerNonce, getAllPeerAddresses()));
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("GetNeighborsMessage sent successfully from " + getAddress() + " to " + peerAddress); log.trace("GetPeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
/* // we wait to get the success to reduce the time span of the moment of /* // we wait to get the success to reduce the time span of the moment of
// authentication at both sides of the connection // authentication at both sides of the connection
@ -375,52 +378,52 @@ public class Routing {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("GetNeighborsMessage sending failed " + throwable.getMessage()); log.info("GetPeersMessage sending failed " + throwable.getMessage());
removeNeighbor(peerAddress); removePeer(peerAddress);
} }
}); });
} else { } else {
log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap); log.warn("verifyNonceAndAuthenticatePeerAddress failed. challengeMessage=" + challengeMessage + " / nonceMap=" + tempNonceMap);
} }
} else if (message instanceof GetNeighborsMessage) { } else if (message instanceof GetPeersMessage) {
GetNeighborsMessage getNeighborsMessage = (GetNeighborsMessage) message; GetPeersMessage getPeersMessage = (GetPeersMessage) message;
Address peerAddress = getNeighborsMessage.address; Address peerAddress = getPeersMessage.address;
log.trace("GetNeighborsMessage from " + peerAddress + " at " + getAddress()); log.trace("GetPeersMessage from " + peerAddress + " at " + getAddress());
boolean verified = verifyNonceAndAuthenticatePeerAddress(getNeighborsMessage.challengerNonce, peerAddress); boolean verified = verifyNonceAndAuthenticatePeerAddress(getPeersMessage.challengerNonce, peerAddress);
if (verified) { if (verified) {
setAuthenticated(connection, peerAddress); setAuthenticated(connection, peerAddress);
purgeReportedNeighbors(); purgeReportedPeers();
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, SettableFuture<Connection> future = networkNode.sendMessage(peerAddress,
new NeighborsMessage(getAddress(), getAllNeighborAddresses())); new PeersMessage(getAddress(), getAllPeerAddresses()));
log.trace("sent NeighborsMessage to " + peerAddress + " from " + getAddress() log.trace("sent PeersMessage to " + peerAddress + " from " + getAddress()
+ " with allNeighbors=" + getAllNeighborAddresses()); + " with allPeers=" + getAllPeerAddresses());
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.trace("NeighborsMessage sent successfully from " + getAddress() + " to " + peerAddress); log.trace("PeersMessage sent successfully from " + getAddress() + " to " + peerAddress);
} }
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("NeighborsMessage sending failed " + throwable.getMessage()); log.info("PeersMessage sending failed " + throwable.getMessage());
removeNeighbor(peerAddress); removePeer(peerAddress);
} }
}); });
// now we add the reported neighbors to our own set // now we add the reported peers to our own set
ArrayList<Address> neighborAddresses = ((GetNeighborsMessage) message).neighborAddresses; ArrayList<Address> peerAddresses = ((GetPeersMessage) message).peerAddresses;
log.trace("Received neighbors: " + neighborAddresses); log.trace("Received peers: " + peerAddresses);
// remove ourselves // remove ourselves
addToReportedNeighbors(neighborAddresses, connection); addToReportedPeers(peerAddresses, connection);
} }
} else if (message instanceof NeighborsMessage) { } else if (message instanceof PeersMessage) {
NeighborsMessage neighborsMessage = (NeighborsMessage) message; PeersMessage peersMessage = (PeersMessage) message;
Address peerAddress = neighborsMessage.address; Address peerAddress = peersMessage.address;
log.trace("NeighborsMessage from " + peerAddress + " at " + getAddress()); log.trace("PeersMessage from " + peerAddress + " at " + getAddress());
ArrayList<Address> neighborAddresses = neighborsMessage.neighborAddresses; ArrayList<Address> peerAddresses = peersMessage.peerAddresses;
log.trace("Received neighbors: " + neighborAddresses); log.trace("Received peers: " + peerAddresses);
// remove ourselves // remove ourselves
addToReportedNeighbors(neighborAddresses, connection); addToReportedPeers(peerAddresses, connection);
// we wait until the handshake is completed before setting the authenticate flag // we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection // authentication at both sides of the connection
@ -435,58 +438,58 @@ public class Routing {
if (authenticationCompleteHandler != null) if (authenticationCompleteHandler != null)
authenticationCompleteHandler.run(); authenticationCompleteHandler.run();
authenticateToNextRandomNeighbor(); authenticateToNextRandomPeer();
} }
} }
private void addToReportedNeighbors(ArrayList<Address> neighborAddresses, Connection connection) { private void addToReportedPeers(ArrayList<Address> peerAddresses, Connection connection) {
log.trace("addToReportedNeighbors"); log.trace("addToReportedPeers");
// we disconnect misbehaving nodes trying to send too many neighbors // we disconnect misbehaving nodes trying to send too many peers
// reported neighbors include the peers connected neighbors which is normally max. 8 but we give some headroom // reported peers include the peers connected peers which is normally max. 8 but we give some headroom
// for safety // for safety
if (neighborAddresses.size() > 1100) { if (peerAddresses.size() > 1100) {
connection.shutDown(); connection.shutDown();
} else { } else {
neighborAddresses.remove(getAddress()); peerAddresses.remove(getAddress());
reportedNeighborAddresses.addAll(neighborAddresses); reportedPeerAddresses.addAll(peerAddresses);
purgeReportedNeighbors(); purgeReportedPeers();
} }
} }
private void purgeReportedNeighbors() { private void purgeReportedPeers() {
log.trace("purgeReportedNeighbors"); log.trace("purgeReportedPeers");
int all = getAllNeighborAddresses().size(); int all = getAllPeerAddresses().size();
if (all > 1000) { if (all > 1000) {
int diff = all - 100; int diff = all - 100;
List<Address> list = getNotConnectedNeighborAddresses(); List<Address> list = getNotConnectedPeerAddresses();
for (int i = 0; i < diff; i++) { for (int i = 0; i < diff; i++) {
Address toRemove = list.remove(new Random().nextInt(list.size())); Address toRemove = list.remove(new Random().nextInt(list.size()));
reportedNeighborAddresses.remove(toRemove); reportedPeerAddresses.remove(toRemove);
} }
} }
} }
private List<Address> getNotConnectedNeighborAddresses() { private List<Address> getNotConnectedPeerAddresses() {
ArrayList<Address> list = new ArrayList<>(getAllNeighborAddresses()); ArrayList<Address> list = new ArrayList<>(getAllPeerAddresses());
log.debug("## getNotConnectedNeighborAddresses "); log.debug("## getNotConnectedPeerAddresses ");
log.debug("## reportedNeighborsList=" + list); log.debug("## reportedPeersList=" + list);
connectedNeighbors.values().stream().forEach(e -> list.remove(e.address)); authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
log.debug("## connectedNeighbors=" + connectedNeighbors); log.debug("## connectedPeers=" + authenticatedPeers);
log.debug("## reportedNeighborsList=" + list); log.debug("## reportedPeersList=" + list);
return list; return list;
} }
private void authenticateToNextRandomNeighbor() { private void authenticateToNextRandomPeer() {
executorService.submit(() -> { executorService.submit(() -> {
try { try {
Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS); Uninterruptibles.sleepUninterruptibly(new Random().nextInt(200) + 200, TimeUnit.MILLISECONDS);
if (getConnectedNeighbors().size() <= MAX_CONNECTIONS) { if (getAuthenticatedPeers().size() <= MAX_CONNECTIONS) {
Address randomNotConnectedNeighborAddress = getRandomNotConnectedNeighborAddress(); Address randomNotConnectedPeerAddress = getRandomNotConnectedPeerAddress();
if (randomNotConnectedNeighborAddress != null) { if (randomNotConnectedPeerAddress != null) {
log.info("We try to build an authenticated connection to a random neighbor. " + randomNotConnectedNeighborAddress); log.info("We try to build an authenticated connection to a random peer. " + randomNotConnectedPeerAddress);
authenticateToPeer(randomNotConnectedNeighborAddress, null, () -> authenticateToNextRandomNeighbor()); authenticateToPeer(randomNotConnectedPeerAddress, null, () -> authenticateToNextRandomPeer());
} else { } else {
log.info("No more neighbors available for connecting."); log.info("No more peers available for connecting.");
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
@ -515,7 +518,7 @@ public class Routing {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("send IdMessage failed. " + throwable.getMessage()); log.info("send IdMessage failed. " + throwable.getMessage());
removeNeighbor(address); removePeer(address);
if (faultHandler != null) faultHandler.run(); if (faultHandler != null) faultHandler.run();
} }
}); });
@ -545,18 +548,18 @@ public class Routing {
+ "\npeerAddress= " + peerAddress + "\npeerAddress= " + peerAddress
+ "\n############################################################\n"); + "\n############################################################\n");
connection.onAuthenticationComplete(peerAddress, connection); connection.setAuthenticated(peerAddress, connection);
Neighbor neighbor = new Neighbor(connection); Peer peer = new Peer(connection);
addConnectedNeighbor(peerAddress, neighbor); addAuthenticatedPeer(peerAddress, peer);
routingListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection)); routingListeners.stream().forEach(e -> e.onConnectionAuthenticated(connection));
log.debug("\n### setAuthenticated post connection " + connection); log.debug("\n### setAuthenticated post connection " + connection);
} }
private Address getRandomNotConnectedNeighborAddress() { private Address getRandomNotConnectedPeerAddress() {
List<Address> list = getNotConnectedNeighborAddresses(); List<Address> list = getNotConnectedPeerAddresses();
if (list.size() > 0) { if (list.size() > 0) {
Collections.shuffle(list); Collections.shuffle(list);
return list.get(0); return list.get(0);
@ -583,14 +586,14 @@ public class Routing {
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("PongMessage sending failed " + throwable.getMessage()); log.info("PongMessage sending failed " + throwable.getMessage());
removeNeighbor(connection.getPeerAddress()); removePeer(connection.getPeerAddress());
} }
}); });
} else if (message instanceof PongMessage) { } else if (message instanceof PongMessage) {
Neighbor neighbor = connectedNeighbors.get(connection.getPeerAddress()); Peer peer = authenticatedPeers.get(connection.getPeerAddress());
if (neighbor != null) { if (peer != null) {
if (((PongMessage) message).nonce != neighbor.getPingNonce()) { if (((PongMessage) message).nonce != peer.getPingNonce()) {
removeNeighbor(neighbor.address); removePeer(peer.address);
log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress()); log.warn("PongMessage invalid: self/peer " + getAddress() + "/" + connection.getPeerAddress());
} }
} }
@ -599,41 +602,41 @@ public class Routing {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Neighbors // Peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void removeNeighbor(@Nullable Address peerAddress) { private void removePeer(@Nullable Address peerAddress) {
reportedNeighborAddresses.remove(peerAddress); reportedPeerAddresses.remove(peerAddress);
Neighbor disconnectedNeighbor; Peer disconnectedPeer;
disconnectedNeighbor = connectedNeighbors.remove(peerAddress); disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedNeighbor != null) if (disconnectedPeer != null)
UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onNeighborRemoved(peerAddress))); UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
log.trace("removeNeighbor [post]"); log.trace("removePeer [post]");
printConnectedNeighborsMap(); printConnectedPeersMap();
printReportedNeighborsMap(); printReportedPeersMap();
log.trace("removeNeighbor nonceMap=" + nonceMap + " / peerAddress=" + peerAddress); log.trace("removePeer nonceMap=" + nonceMap + " / peerAddress=" + peerAddress);
nonceMap.remove(peerAddress); nonceMap.remove(peerAddress);
} }
private void addConnectedNeighbor(Address address, Neighbor neighbor) { private void addAuthenticatedPeer(Address address, Peer peer) {
boolean firstNeighborAdded; boolean firstPeerAdded;
connectedNeighbors.put(address, neighbor); authenticatedPeers.put(address, peer);
firstNeighborAdded = connectedNeighbors.size() == 1; firstPeerAdded = authenticatedPeers.size() == 1;
UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onNeighborAdded(neighbor))); UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onPeerAdded(peer)));
if (firstNeighborAdded) if (firstPeerAdded)
UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onFirstNeighborAdded(neighbor))); UserThread.execute(() -> routingListeners.stream().forEach(e -> e.onFirstPeerAdded(peer)));
if (connectedNeighbors.size() > MAX_CONNECTIONS) if (authenticatedPeers.size() > MAX_CONNECTIONS)
disconnectOldConnections(); disconnectOldConnections();
log.trace("addConnectedNeighbor [post]"); log.trace("addConnectedPeer [post]");
printConnectedNeighborsMap(); printConnectedPeersMap();
} }
private Address getAddress() { private Address getAddress() {
@ -645,18 +648,18 @@ public class Routing {
// Utils // Utils
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void printConnectedNeighborsMap() { public void printConnectedPeersMap() {
StringBuilder result = new StringBuilder("\nConnected neighbors for node " + getAddress() + ":"); StringBuilder result = new StringBuilder("\nConnected peers for node " + getAddress() + ":");
connectedNeighbors.values().stream().forEach(e -> { authenticatedPeers.values().stream().forEach(e -> {
result.append("\n\t" + e.address); result.append("\n\t" + e.address);
}); });
result.append("\n"); result.append("\n");
log.info(result.toString()); log.info(result.toString());
} }
public void printReportedNeighborsMap() { public void printReportedPeersMap() {
StringBuilder result = new StringBuilder("\nReported neighborAddresses for node " + getAddress() + ":"); StringBuilder result = new StringBuilder("\nReported peerAddresses for node " + getAddress() + ":");
reportedNeighborAddresses.stream().forEach(e -> { reportedPeerAddresses.stream().forEach(e -> {
result.append("\n\t" + e); result.append("\n\t" + e);
}); });
result.append("\n"); result.append("\n");

View file

@ -4,11 +4,11 @@ import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.Connection;
public interface RoutingListener { public interface RoutingListener {
void onFirstNeighborAdded(Neighbor neighbor); void onFirstPeerAdded(Peer peer);
void onNeighborAdded(Neighbor neighbor); void onPeerAdded(Peer peer);
void onNeighborRemoved(Address address); void onPeerRemoved(Address address);
// TODO remove // TODO remove
void onConnectionAuthenticated(Connection connection); void onConnectionAuthenticated(Connection connection);

View file

@ -5,26 +5,26 @@ import io.bitsquare.p2p.Address;
import java.util.ArrayList; import java.util.ArrayList;
public final class GetNeighborsMessage implements AuthenticationMessage { public final class GetPeersMessage implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address; public final Address address;
public final long challengerNonce; public final long challengerNonce;
public final ArrayList<Address> neighborAddresses; public final ArrayList<Address> peerAddresses;
public GetNeighborsMessage(Address address, long challengerNonce, ArrayList<Address> neighborAddresses) { public GetPeersMessage(Address address, long challengerNonce, ArrayList<Address> peerAddresses) {
this.address = address; this.address = address;
this.challengerNonce = challengerNonce; this.challengerNonce = challengerNonce;
this.neighborAddresses = neighborAddresses; this.peerAddresses = peerAddresses;
} }
@Override @Override
public String toString() { public String toString() {
return "GetNeighborsMessage{" + return "GetPeersMessage{" +
"address=" + address + "address=" + address +
", challengerNonce=" + challengerNonce + ", challengerNonce=" + challengerNonce +
", neighborAddresses=" + neighborAddresses + ", peerAddresses=" + peerAddresses +
'}'; '}';
} }
} }

View file

@ -5,21 +5,21 @@ import io.bitsquare.p2p.Address;
import java.util.ArrayList; import java.util.ArrayList;
public final class NeighborsMessage implements AuthenticationMessage { public final class PeersMessage implements AuthenticationMessage {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final Address address; public final Address address;
public final ArrayList<Address> neighborAddresses; public final ArrayList<Address> peerAddresses;
public NeighborsMessage(Address address, ArrayList<Address> neighborAddresses) { public PeersMessage(Address address, ArrayList<Address> peerAddresses) {
this.address = address; this.address = address;
this.neighborAddresses = neighborAddresses; this.peerAddresses = peerAddresses;
} }
@Override @Override
public String toString() { public String toString() {
return "NeighborsMessage{" + "neighborAddresses=" + neighborAddresses + '}'; return "PeersMessage{" + "peerAddresses=" + peerAddresses + '}';
} }
} }

View file

@ -20,10 +20,7 @@ import java.io.File;
import java.math.BigInteger; import java.math.BigInteger;
import java.security.KeyPair; import java.security.KeyPair;
import java.security.PublicKey; import java.security.PublicKey;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -80,9 +77,10 @@ public class ProtectedExpirableDataStorage {
} }
}); });
timer.scheduleAtFixedRate(new TimerTask() { TimerTask task = new TimerTask() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("RemoveExpiredEntriesTimer-" + new Random().nextInt(1000));
try { try {
log.info("removeExpiredEntries called "); log.info("removeExpiredEntries called ");
map.entrySet().stream().filter(entry -> entry.getValue().isExpired()) map.entrySet().stream().filter(entry -> entry.getValue().isExpired())
@ -92,9 +90,8 @@ public class ProtectedExpirableDataStorage {
log.error("Executing task failed. " + t.getMessage()); log.error("Executing task failed. " + t.getMessage());
} }
} }
}, };
CHECK_TTL_INTERVAL, timer.scheduleAtFixedRate(task, CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL);
CHECK_TTL_INTERVAL);
} }
@ -127,7 +124,7 @@ public class ProtectedExpirableDataStorage {
if (result) { if (result) {
map.put(hashOfPayload, protectedData); map.put(hashOfPayload, protectedData);
log.trace("Data added to our map and it will be broadcasted to our neighbors."); log.trace("Data added to our map and it will be broadcasted to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData))); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)));
StringBuilder sb = new StringBuilder("\n\n----------------------------------------------------\n" + StringBuilder sb = new StringBuilder("\n\n----------------------------------------------------\n" +
@ -244,7 +241,7 @@ public class ProtectedExpirableDataStorage {
private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) { private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) {
map.remove(hashOfPayload); map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our neighbors."); log.trace("Data removed from our map. We broadcast the message to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData))); UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)));
StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n"); StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n");

View file

@ -107,7 +107,7 @@ public class RoutingTest {
P2PService p2PService1 = seedNode1.getP2PService(); P2PService p2PService1 = seedNode1.getP2PService();
latch.await(); latch.await();
Thread.sleep(500); Thread.sleep(500);
Assert.assertEquals(0, p2PService1.getRouting().getAllNeighborAddresses().size()); Assert.assertEquals(0, p2PService1.getRouting().getAllPeerAddresses().size());
} }
@Test @Test
@ -180,8 +180,8 @@ public class RoutingTest {
}); });
P2PService p2PService2 = seedNode2.getP2PService(); P2PService p2PService2 = seedNode2.getP2PService();
latch.await(); latch.await();
Assert.assertEquals(1, p2PService1.getRouting().getAllNeighborAddresses().size()); Assert.assertEquals(1, p2PService1.getRouting().getAllPeerAddresses().size());
Assert.assertEquals(1, p2PService2.getRouting().getAllNeighborAddresses().size()); Assert.assertEquals(1, p2PService2.getRouting().getAllPeerAddresses().size());
} }
// @Test // @Test
@ -201,9 +201,9 @@ public class RoutingTest {
// node1: close connection // node1: close connection
// node1 -> node2 ChallengeMessage on new connection // node1 -> node2 ChallengeMessage on new connection
// node2: authentication to node1 done if nonce ok // node2: authentication to node1 done if nonce ok
// node2 -> node1 GetNeighborsMessage // node2 -> node1 GetPeersMessage
// node1: authentication to node2 done if nonce ok // node1: authentication to node2 done if nonce ok
// node1 -> node2 NeighborsMessage // node1 -> node2 PeersMessage
// first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2 // first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2
CountDownLatch latch1 = new CountDownLatch(2); CountDownLatch latch1 = new CountDownLatch(2);
@ -228,7 +228,7 @@ public class RoutingTest {
seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1); seedNode1.getP2PService().getRouting().removeRoutingListener(routingListener1);
seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2); seedNode2.getP2PService().getRouting().removeRoutingListener(routingListener2);
// wait until Neighbors msg finished // wait until Peers msg finished
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
// authentication: // authentication:
@ -259,7 +259,7 @@ public class RoutingTest {
}); });
latch2.await(); latch2.await();
// wait until Neighbors msg finished // wait until Peers msg finished
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
@ -282,9 +282,9 @@ public class RoutingTest {
// node1: close connection // node1: close connection
// node1 -> node2 ChallengeMessage on new connection // node1 -> node2 ChallengeMessage on new connection
// node2: authentication to node1 done if nonce ok // node2: authentication to node1 done if nonce ok
// node2 -> node1 GetNeighborsMessage // node2 -> node1 GetPeersMessage
// node1: authentication to node2 done if nonce ok // node1: authentication to node2 done if nonce ok
// node1 -> node2 NeighborsMessage // node1 -> node2 PeersMessage
// first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2 // first authentication from seedNode2 to seedNode1, then from seedNode1 to seedNode2
CountDownLatch latch1 = new CountDownLatch(2); CountDownLatch latch1 = new CountDownLatch(2);
@ -364,8 +364,8 @@ public class RoutingTest {
// total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20 // total authentications at com nodes = 90, System load (nr. threads/used memory (MB)): 170/20
// total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46 // total authentications at 20 nodes = 380, System load (nr. threads/used memory (MB)): 525/46
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
nodes[i].getP2PService().getRouting().printConnectedNeighborsMap(); nodes[i].getP2PService().getRouting().printConnectedPeersMap();
nodes[i].getP2PService().getRouting().printReportedNeighborsMap(); nodes[i].getP2PService().getRouting().printReportedPeersMap();
} }
CountDownLatch shutDownLatch = new CountDownLatch(length); CountDownLatch shutDownLatch = new CountDownLatch(length);