From b81e263c24478ac911f1df0ade9f81c4d8415583 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 9 Nov 2015 16:16:43 +0100 Subject: [PATCH] Cleanup listeners --- .../bitsquare/common/crypto/PubKeyRing.java | 4 +- .../io/bitsquare/btc/TradeWalletService.java | 4 +- .../buyer/CreateAndSignDepositTxAsBuyer.java | 8 +- .../java/io/bitsquare/p2p/P2PService.java | 201 +++++++-------- .../io/bitsquare/p2p/network/Connection.java | 95 ++++---- .../io/bitsquare/p2p/network/NetworkNode.java | 113 +++++---- .../java/io/bitsquare/p2p/network/Server.java | 4 +- .../bitsquare/p2p/network/SetupListener.java | 1 - .../p2p/peers/AuthenticationHandshake.java | 228 ++++++++++-------- .../p2p/peers/AuthenticationListener.java | 1 + .../io/bitsquare/p2p/peers/PeerGroup.java | 161 +++++++------ .../ProtectedExpirableDataStorage.java | 84 +++---- .../java/io/bitsquare/p2p/P2PServiceTest.java | 2 +- 13 files changed, 467 insertions(+), 439 deletions(-) diff --git a/common/src/main/java/io/bitsquare/common/crypto/PubKeyRing.java b/common/src/main/java/io/bitsquare/common/crypto/PubKeyRing.java index 61740ca5c2..40529b40cc 100644 --- a/common/src/main/java/io/bitsquare/common/crypto/PubKeyRing.java +++ b/common/src/main/java/io/bitsquare/common/crypto/PubKeyRing.java @@ -99,8 +99,8 @@ public class PubKeyRing implements Serializable { @Override public String toString() { return "PubKeyRing{" + - "\n\nsignaturePubKey=\n" + Util.pubKeyToString(getSignaturePubKey()) + - "\n\nencryptionPubKey=\n" + Util.pubKeyToString(getEncryptionPubKey()) + + "\n\nsignaturePubKey.hashCode()=\n" + signaturePubKey.hashCode() + + "\n\nencryptionPubKey.hashCode()=\n" + encryptionPubKey.hashCode() + '}'; } diff --git a/core/src/main/java/io/bitsquare/btc/TradeWalletService.java b/core/src/main/java/io/bitsquare/btc/TradeWalletService.java index 6773842008..9663ea169e 100644 --- a/core/src/main/java/io/bitsquare/btc/TradeWalletService.java +++ b/core/src/main/java/io/bitsquare/btc/TradeWalletService.java @@ -818,6 +818,7 @@ public class TradeWalletService { // We need to recreate the transaction otherwise we get a null pointer... Transaction result = new Transaction(params, transaction.bitcoinSerialize()); + result.getConfidence(Context.get()).setSource(TransactionConfidence.Source.SELF); log.trace("transaction " + result.toString()); if (wallet != null) @@ -835,6 +836,7 @@ public class TradeWalletService { // We need to recreate the tx otherwise we get a null pointer... Transaction transaction = new Transaction(params, serializedTransaction); + transaction.getConfidence(Context.get()).setSource(TransactionConfidence.Source.NETWORK); log.trace("transaction " + transaction.toString()); if (wallet != null) @@ -941,9 +943,9 @@ public class TradeWalletService { transaction.addInput(p2SHMultiSigOutput); transaction.addOutput(buyerPayoutAmount, new Address(params, buyerAddressString)); transaction.addOutput(sellerPayoutAmount, new Address(params, sellerAddressString)); - transaction.setLockTime(lockTime); // When using lockTime we need to set sequenceNumber to 0 transaction.getInputs().stream().forEach(i -> i.setSequenceNumber(0)); + transaction.setLockTime(lockTime); return transaction; } diff --git a/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/buyer/CreateAndSignDepositTxAsBuyer.java b/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/buyer/CreateAndSignDepositTxAsBuyer.java index 64424c61c2..b8e05d33bb 100644 --- a/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/buyer/CreateAndSignDepositTxAsBuyer.java +++ b/core/src/main/java/io/bitsquare/trade/protocol/trade/tasks/buyer/CreateAndSignDepositTxAsBuyer.java @@ -44,10 +44,10 @@ public class CreateAndSignDepositTxAsBuyer extends TradeTask { Coin buyerInputAmount = FeePolicy.SECURITY_DEPOSIT.add(FeePolicy.TX_FEE); Coin msOutputAmount = buyerInputAmount.add(FeePolicy.SECURITY_DEPOSIT).add(trade.getTradeAmount()); - log.debug("getContractAsJson"); - log.debug("----------"); - log.debug(trade.getContractAsJson()); - log.debug("----------"); + log.info("\n\n------------------------------------------------------------\n" + + "Contract as json\n" + + trade.getContractAsJson() + + "\n------------------------------------------------------------\n"); byte[] contractHash = Hash.getHash(trade.getContractAsJson()); trade.setContractHash(contractHash); diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 16d1182267..c3c092f3ab 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -49,7 +49,7 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * Represents our node in the P2P network */ -public class P2PService implements SetupListener { +public class P2PService implements SetupListener, MessageListener, ConnectionListener, PeerListener { private static final Logger log = LoggerFactory.getLogger(P2PService.class); private final SeedNodesRepository seedNodesRepository; @@ -126,94 +126,9 @@ public class P2PService implements SetupListener { dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir); - networkNode.addConnectionListener(new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - Log.traceCall(); - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - checkArgument(peerAddress.equals(connection.getPeerAddress()), - "peerAddress must match connection.getPeerAddress()"); - authenticatedPeerAddresses.add(peerAddress); - authenticated.set(true); - - dataStorage.setAuthenticated(); - p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - if (connection.isAuthenticated()) - authenticatedPeerAddresses.remove(connection.getPeerAddress()); - } - - @Override - public void onError(Throwable throwable) { - Log.traceCall(); - log.error("onError self/ConnectionException " + networkNode.getAddress() + "/" + throwable); - } - }); - - networkNode.addMessageListener((message, connection) -> { - Log.traceCall(); - if (message instanceof GetDataRequest) { - log.trace("Received GetDataSetMessage: " + message); - networkNode.sendMessage(connection, new GetDataResponse(getDataSet())); - } else if (message instanceof GetDataResponse) { - GetDataResponse getDataResponse = (GetDataResponse) message; - HashSet set = getDataResponse.set; - if (!set.isEmpty()) { - // we keep that connection open as the bootstrapping peer will use that for the authentication - // as we are not authenticated yet the data adding will not be broadcasted - set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); - } else { - log.trace("Received DataSetMessage: Empty data set"); - } - setRequestingDataCompleted(); - } else if (message instanceof SealedAndSignedMessage) { - if (encryptionService != null) { - try { - SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; - DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify( - sealedAndSignedMessage.sealedAndSigned); - decryptedMailListeners.stream().forEach( - e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())); - } catch (CryptoException e) { - log.info("Decryption of SealedAndSignedMessage failed. " + - "That is expected if the message is not intended for us."); - } - } - } - }); - - peerGroup.addPeerListener(new PeerListener() { - @Override - public void onFirstAuthenticatePeer(Peer peer) { - Log.traceCall(); - log.trace("onFirstAuthenticatePeer " + peer); - sendGetAllDataMessageAfterAuthentication(peer); - - } - - @Override - public void onPeerAdded(Peer peer) { - Log.traceCall(); - } - - @Override - public void onPeerRemoved(Address address) { - Log.traceCall(); - } - - @Override - public void onConnectionAuthenticated(Connection connection) { - Log.traceCall(); - } - }); + networkNode.addConnectionListener(this); + networkNode.addMessageListener(this); + peerGroup.addPeerListener(this); dataStorage.addHashMapChangedListener(new HashMapChangedListener() { @Override @@ -244,6 +159,104 @@ public class P2PService implements SetupListener { }); } + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + Log.traceCall(); + if (message instanceof GetDataRequest) { + log.info("Received GetDataSetMessage: " + message); + networkNode.sendMessage(connection, new GetDataResponse(getDataSet())); + } else if (message instanceof GetDataResponse) { + GetDataResponse getDataResponse = (GetDataResponse) message; + log.info("Received GetDataResponse: " + message); + HashSet set = getDataResponse.set; + if (!set.isEmpty()) { + // we keep that connection open as the bootstrapping peer will use that for the authentication + // as we are not authenticated yet the data adding will not be broadcasted + set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress())); + } else { + log.trace("Received DataSetMessage: Empty data set"); + } + setRequestingDataCompleted(); + } else if (message instanceof SealedAndSignedMessage) { + if (encryptionService != null) { + try { + SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message; + DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify( + sealedAndSignedMessage.sealedAndSigned); + log.info("Received SealedAndSignedMessage and decrypted it: " + decryptedMsgWithPubKey); + decryptedMailListeners.stream().forEach( + e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())); + } catch (CryptoException e) { + log.info("Decryption of SealedAndSignedMessage failed. " + + "That is expected if the message is not intended for us."); + } + } + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); + checkArgument(peerAddress.equals(connection.getPeerAddress()), + "peerAddress must match connection.getPeerAddress()"); + authenticatedPeerAddresses.add(peerAddress); + authenticated.set(true); + + p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); + if (connection.isAuthenticated()) + authenticatedPeerAddresses.remove(connection.getPeerAddress()); + } + + @Override + public void onError(Throwable throwable) { + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // PeerListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onFirstAuthenticatePeer(Peer peer) { + Log.traceCall(); + log.trace("onFirstAuthenticatePeer " + peer); + sendGetAllDataMessageAfterAuthentication(peer); + + } + + @Override + public void onPeerAdded(Peer peer) { + Log.traceCall(); + } + + @Override + public void onPeerRemoved(Address address) { + Log.traceCall(); + } + + @Override + public void onConnectionAuthenticated(Connection connection) { + Log.traceCall(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // SetupListener implementation @@ -579,16 +592,6 @@ public class P2PService implements SetupListener { // Listeners /////////////////////////////////////////////////////////////////////////////////////////// - public void addMessageListener(MessageListener messageListener) { - Log.traceCall(); - networkNode.addMessageListener(messageListener); - } - - public void removeMessageListener(MessageListener messageListener) { - Log.traceCall(); - networkNode.removeMessageListener(messageListener); - } - public void addDecryptedMailListener(DecryptedMailListener listener) { Log.traceCall(); decryptedMailListeners.add(listener); diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 65825fd1b5..57885633b7 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; * All handlers are called on User thread. * Shared data between InputHandler thread and that */ -public class Connection { +public class Connection implements MessageListener { private static final Logger log = LoggerFactory.getLogger(Connection.class); private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data private static final int MAX_ILLEGAL_REQUESTS = 5; @@ -40,11 +40,16 @@ public class Connection { private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min. private InputHandler inputHandler; private volatile boolean isAuthenticated; + private String connectionId; public static int getMaxMsgSize() { return MAX_MSG_SIZE; } + private final Socket socket; + private final MessageListener messageListener; + private final ConnectionListener connectionListener; + private final String portInfo; private final String uid; private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); @@ -72,19 +77,23 @@ public class Connection { /////////////////////////////////////////////////////////////////////////////////////////// public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + this.socket = socket; + this.messageListener = messageListener; + this.connectionListener = connectionListener; + Log.traceCall(); uid = UUID.randomUUID().toString(); if (socket.getLocalPort() == 0) portInfo = "port=" + socket.getPort(); else portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort(); - - init(socket, messageListener, connectionListener); + + init(); } - private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) { + private void init() { Log.traceCall(); - sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression); + sharedSpace = new SharedSpace(this, socket); try { socket.setSoTimeout(SOCKET_TIMEOUT); // Need to access first the ObjectOutputStream otherwise the ObjectInputStream would block @@ -97,7 +106,7 @@ public class Connection { // We create a thread for handling inputStream data - inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo); + inputHandler = new InputHandler(sharedSpace, objectInputStream, portInfo, this, useCompression); singleThreadExecutor.submit(inputHandler); } catch (IOException e) { sharedSpace.handleConnectionException(e); @@ -117,11 +126,10 @@ public class Connection { // Called form UserThread public void setAuthenticated(Address peerAddress, Connection connection) { Log.traceCall(); - synchronized (peerAddress) { - this.peerAddress = peerAddress; - } + this.peerAddress = peerAddress; isAuthenticated = true; - sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection); + if (!stopped) + connectionListener.onPeerAddressAuthenticated(peerAddress, connection); } // Called form various threads @@ -178,6 +186,18 @@ public class Connection { this.peerAddress = peerAddress; } + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + // Only get non - CloseConnectionMessage messages + @Override + public void onMessage(Message message, Connection connection) { + // connection is null as we get called from InputHandler, which does not hold a reference to Connection + UserThread.execute(() -> messageListener.onMessage(message, this)); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Getters /////////////////////////////////////////////////////////////////////////////////////////// @@ -231,14 +251,14 @@ public class Connection { private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) { Log.traceCall(this.toString()); if (!stopped) { - log.info("\n\n############################################################\n" + + log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "ShutDown connection:" + "\npeerAddress=" + peerAddress + "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort() + "/" + sharedSpace.getSocket().getPort() + "\nobjectId=" + objectId + " / uid=" + uid + "\nisAuthenticated=" + isAuthenticated - + "\n############################################################\n"); + + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); log.trace("ShutDown connection requested. Connection=" + this.toString()); @@ -275,7 +295,7 @@ public class Connection { shutDownReason = ConnectionListener.Reason.SHUT_DOWN; final ConnectionListener.Reason finalShutDownReason = shutDownReason; // keep UserThread.execute as its not clear if that is called from a non-UserThread - UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(finalShutDownReason, this)); + UserThread.execute(() -> connectionListener.onDisconnect(finalShutDownReason, this)); try { sharedSpace.getSocket().close(); @@ -289,6 +309,7 @@ public class Connection { log.debug("Connection shutdown complete " + this.toString()); // keep UserThread.execute as its not clear if that is called from a non-UserThread + if (shutDownCompleteHandler != null) UserThread.execute(shutDownCompleteHandler); } @@ -330,6 +351,11 @@ public class Connection { '}'; } + public String getConnectionId() { + return connectionId; + } + + /////////////////////////////////////////////////////////////////////////////////////////// // SharedSpace /////////////////////////////////////////////////////////////////////////////////////////// @@ -343,9 +369,6 @@ public class Connection { private final Connection connection; private final Socket socket; - private final MessageListener messageListener; - private final ConnectionListener connectionListener; - private final boolean useCompression; private final ConcurrentHashMap illegalRequests = new ConcurrentHashMap<>(); // mutable @@ -353,14 +376,10 @@ public class Connection { private volatile boolean stopped; private ConnectionListener.Reason shutDownReason; - public SharedSpace(Connection connection, Socket socket, MessageListener messageListener, - ConnectionListener connectionListener, boolean useCompression) { + public SharedSpace(Connection connection, Socket socket) { Log.traceCall(); this.connection = connection; this.socket = socket; - this.messageListener = messageListener; - this.connectionListener = connectionListener; - this.useCompression = useCompression; } public synchronized void updateLastActivityDate() { @@ -387,7 +406,7 @@ public class Connection { public void handleConnectionException(Exception e) { Log.traceCall(e.toString()); - log.warn("Exception might be expected: " + e.toString()); + log.debug("Exception might be expected: " + e.toString()); if (e instanceof SocketException) { if (socket.isClosed()) shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED; @@ -409,34 +428,18 @@ public class Connection { } } - public void onMessage(Message message) { - //Log.traceCall(); - UserThread.execute(() -> messageListener.onMessage(message, connection)); - } - - public boolean useCompression() { - //Log.traceCall(); - return useCompression; - } - public void shutDown(boolean sendCloseConnectionMessage) { Log.traceCall(); connection.shutDown(sendCloseConnectionMessage); } - public synchronized ConnectionListener getConnectionListener() { - // Log.traceCall(); - return connectionListener; - } public synchronized Socket getSocket() { - //Log.traceCall(); return socket; } public String getConnectionId() { - //Log.traceCall(); - return connection.objectId; + return connection.getConnectionId(); } public void stop() { @@ -445,7 +448,6 @@ public class Connection { } public synchronized ConnectionListener.Reason getShutDownReason() { - //Log.traceCall(); return shutDownReason; } @@ -453,12 +455,10 @@ public class Connection { public String toString() { return "SharedSpace{" + ", socket=" + socket + - ", useCompression=" + useCompression + ", illegalRequests=" + illegalRequests + ", lastActivityDate=" + lastActivityDate + '}'; } - } @@ -473,13 +473,18 @@ public class Connection { private final SharedSpace sharedSpace; private final ObjectInputStream objectInputStream; private final String portInfo; + private final MessageListener messageListener; + private final boolean useCompression; + private volatile boolean stopped; - public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) { + public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo, MessageListener messageListener, boolean useCompression) { + this.useCompression = useCompression; Log.traceCall(); this.sharedSpace = sharedSpace; this.objectInputStream = objectInputStream; this.portInfo = portInfo; + this.messageListener = messageListener; } public void stop() { @@ -502,7 +507,7 @@ public class Connection { int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; if (size <= getMaxMsgSize()) { Serializable serializable = null; - if (sharedSpace.useCompression()) { + if (useCompression) { if (rawInputObject instanceof byte[]) { byte[] compressedObjectAsBytes = (byte[]) rawInputObject; size = compressedObjectAsBytes.length; @@ -529,7 +534,7 @@ public class Connection { stopped = true; sharedSpace.shutDown(false); } else if (!stopped) { - sharedSpace.onMessage(message); + messageListener.onMessage(message, null); } } else { sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType); diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index f4345c75ce..2bf57b3a29 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -34,8 +34,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener protected final CopyOnWriteArraySet setupListeners = new CopyOnWriteArraySet<>(); protected ListeningExecutorService executorService; private Server server; - private volatile boolean shutDownInProgress; + private ConnectionListener startServerConnectionListener; + private volatile boolean shutDownInProgress; // accessed from different threads private final CopyOnWriteArraySet outBoundConnections = new CopyOnWriteArraySet<>(); @@ -97,12 +98,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener newConnection.setPeerAddress(peerAddress); outBoundConnections.add(newConnection); - log.info("\n\n############################################################\n" + + log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "NetworkNode created new outbound connection:" + "\npeerAddress=" + peerAddress + "\nconnection.uid=" + newConnection.getUid() + "\nmessage=" + message - + "\n############################################################\n"); + + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); newConnection.sendMessage(message); return newConnection; // can take a while when using tor @@ -196,19 +197,9 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener + // ConnectionListener implementation /////////////////////////////////////////////////////////////////////////////////////////// - public void addConnectionListener(ConnectionListener connectionListener) { - Log.traceCall(); - connectionListeners.add(connectionListener); - } - - public void removeConnectionListener(ConnectionListener connectionListener) { - Log.traceCall(); - connectionListeners.remove(connectionListener); - } - @Override public void onConnection(Connection connection) { Log.traceCall(); @@ -241,9 +232,30 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// - // MessageListener + // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// + @Override + public void onMessage(Message message, Connection connection) { + Log.traceCall(); + messageListeners.stream().forEach(e -> e.onMessage(message, connection)); + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Listeners + /////////////////////////////////////////////////////////////////////////////////////////// + + public void addConnectionListener(ConnectionListener connectionListener) { + Log.traceCall(); + connectionListeners.add(connectionListener); + } + + public void removeConnectionListener(ConnectionListener connectionListener) { + Log.traceCall(); + connectionListeners.remove(connectionListener); + } + public void addMessageListener(MessageListener messageListener) { Log.traceCall(); messageListeners.add(messageListener); @@ -254,12 +266,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener messageListeners.remove(messageListener); } - @Override - public void onMessage(Message message, Connection connection) { - Log.traceCall(); - messageListeners.stream().forEach(e -> e.onMessage(message, connection)); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Protected @@ -272,39 +278,40 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener protected void startServer(ServerSocket serverSocket) { Log.traceCall(); + startServerConnectionListener = new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + Log.traceCall(); + // we still have not authenticated so put it to the temp list + inBoundConnections.add(connection); + NetworkNode.this.onConnection(connection); + } + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + Log.traceCall(); + NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + Log.traceCall(); + Address peerAddress = connection.getPeerAddress(); + log.trace("onDisconnect at incoming connection to peerAddress (or connection) " + + ((peerAddress == null) ? connection : peerAddress)); + inBoundConnections.remove(connection); + NetworkNode.this.onDisconnect(reason, connection); + } + + @Override + public void onError(Throwable throwable) { + Log.traceCall(); + NetworkNode.this.onError(throwable); + } + }; server = new Server(serverSocket, - (message, connection) -> NetworkNode.this.onMessage(message, connection), - new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - Log.traceCall(); - // we still have not authenticated so put it to the temp list - inBoundConnections.add(connection); - NetworkNode.this.onConnection(connection); - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - Address peerAddress = connection.getPeerAddress(); - log.trace("onDisconnect at incoming connection to peerAddress (or connection) " - + ((peerAddress == null) ? connection : peerAddress)); - inBoundConnections.remove(connection); - NetworkNode.this.onDisconnect(reason, connection); - } - - @Override - public void onError(Throwable throwable) { - Log.traceCall(); - NetworkNode.this.onError(throwable); - } - }); + NetworkNode.this, + startServerConnectionListener); executorService.submit(server); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Server.java b/network/src/main/java/io/bitsquare/p2p/network/Server.java index 73acf79da1..eeddf8b947 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Server.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Server.java @@ -45,12 +45,12 @@ class Server implements Runnable { log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort()); Connection connection = new Connection(socket, messageListener, connectionListener); - log.info("\n\n############################################################\n" + + log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "Server created new inbound connection:" + "\nlocalPort/port=" + serverSocket.getLocalPort() + "/" + socket.getPort() + "\nconnection.uid=" + connection.getUid() - + "\n############################################################\n"); + + "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n"); if (!stopped) connections.add(connection); diff --git a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java index d46b4a6c6b..bb4bc5ef17 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java @@ -1,6 +1,5 @@ package io.bitsquare.p2p.network; - public interface SetupListener { void onTorNodeReady(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java index b5192d773f..aacf7ba6bd 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationHandshake.java @@ -7,6 +7,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.bitsquare.app.Log; import io.bitsquare.common.util.Tuple2; import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.MessageListener; import io.bitsquare.p2p.network.NetworkNode; @@ -28,7 +29,7 @@ import java.util.*; // node1: authentication to node2 done if nonce ok // node1 -> node2 PeersMessage -public class AuthenticationHandshake { +public class AuthenticationHandshake implements MessageListener { private static final Logger log = LoggerFactory.getLogger(AuthenticationHandshake.class); private final NetworkNode networkNode; @@ -39,7 +40,11 @@ public class AuthenticationHandshake { private long startAuthTs; private long nonce = 0; private boolean stopped; - private MessageListener messageListener; + + + /////////////////////////////////////////////////////////////////////////////////////////// + // Constructor + /////////////////////////////////////////////////////////////////////////////////////////// public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) { Log.traceCall(); @@ -47,26 +52,105 @@ public class AuthenticationHandshake { this.peerGroup = peerGroup; this.myAddress = myAddress; - setupMessageListener(); + networkNode.addMessageListener(this); } - private void onFault(@NotNull Throwable throwable) { + + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { Log.traceCall(); - cleanup(); - resultFuture.setException(throwable); + if (message instanceof AuthenticationMessage) { + if (message instanceof AuthenticationResponse) { + // Requesting peer + AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; + Address peerAddress = authenticationResponse.address; + log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); + boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; + if (verified) { + connection.setPeerAddress(peerAddress); + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("GetPeersMessage sending failed " + throwable.getMessage()); + onFault(throwable); + } + }); + } else { + log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce); + onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce)); + } + } else if (message instanceof GetPeersAuthRequest) { + // Responding peer + GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; + Address peerAddress = getPeersAuthRequest.address; + log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); + boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce; + if (verified) { + // we add the reported peers to our own set + HashSet
peerAddresses = getPeersAuthRequest.peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + + SettableFuture future = networkNode.sendMessage(peerAddress, + new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); + log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress + + " with allPeers=" + peerGroup.getAllPeerAddresses()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("PeersMessage sending failed " + throwable.getMessage()); + onFault(throwable); + } + }); + + log.info("AuthenticationComplete: Peer with address " + peerAddress + + " authenticated (" + connection.objectId + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms."); + + onSuccess(connection); + } else { + log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce); + onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce)); + } + } else if (message instanceof GetPeersAuthResponse) { + // Requesting peer + GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message; + Address peerAddress = getPeersAuthResponse.address; + log.trace("PeersMessage from " + peerAddress + " at " + myAddress); + HashSet
peerAddresses = getPeersAuthResponse.peerAddresses; + log.trace("Received peers: " + peerAddresses); + peerGroup.addToReportedPeers(peerAddresses, connection); + + // we wait until the handshake is completed before setting the authenticate flag + // authentication at both sides of the connection + log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress + + " authenticated (" + connection.objectId + "). Took " + + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); + + onSuccess(connection); + } + } } - private void onSuccess(Connection connection) { - Log.traceCall(); - cleanup(); - resultFuture.set(connection); - } - - private void cleanup() { - Log.traceCall(); - stopped = true; - networkNode.removeMessageListener(messageListener); - } + /////////////////////////////////////////////////////////////////////////////////////////// + // API + /////////////////////////////////////////////////////////////////////////////////////////// public SettableFuture requestAuthenticationToPeer(Address peerAddress) { Log.traceCall(); @@ -178,97 +262,10 @@ public class AuthenticationHandshake { return resultFuture; } - private void setupMessageListener() { - Log.traceCall(); - messageListener = (message, connection) -> { - Log.traceCall(); - if (message instanceof AuthenticationMessage) { - if (message instanceof AuthenticationResponse) { - // Requesting peer - AuthenticationResponse authenticationResponse = (AuthenticationResponse) message; - Address peerAddress = authenticationResponse.address; - log.trace("ChallengeMessage from " + peerAddress + " at " + myAddress); - boolean verified = nonce != 0 && nonce == authenticationResponse.requesterNonce; - if (verified) { - connection.setPeerAddress(peerAddress); - SettableFuture future = networkNode.sendMessage(peerAddress, - new GetPeersAuthRequest(myAddress, authenticationResponse.challengerNonce, new HashSet<>(peerGroup.getAllPeerAddresses()))); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("GetPeersMessage sent successfully from " + myAddress + " to " + peerAddress); - } - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("GetPeersMessage sending failed " + throwable.getMessage()); - onFault(throwable); - } - }); - } else { - log.warn("verify nonce failed. challengeMessage=" + authenticationResponse + " / nonce=" + nonce); - onFault(new Exception("Verify nonce failed. challengeMessage=" + authenticationResponse + " / nonceMap=" + nonce)); - } - } else if (message instanceof GetPeersAuthRequest) { - // Responding peer - GetPeersAuthRequest getPeersAuthRequest = (GetPeersAuthRequest) message; - Address peerAddress = getPeersAuthRequest.address; - log.trace("GetPeersMessage from " + peerAddress + " at " + myAddress); - boolean verified = nonce != 0 && nonce == getPeersAuthRequest.challengerNonce; - if (verified) { - // we add the reported peers to our own set - HashSet
peerAddresses = getPeersAuthRequest.peerAddresses; - log.trace("Received peers: " + peerAddresses); - peerGroup.addToReportedPeers(peerAddresses, connection); - - SettableFuture future = networkNode.sendMessage(peerAddress, - new GetPeersAuthResponse(myAddress, new HashSet<>(peerGroup.getAllPeerAddresses()))); - log.trace("sent PeersMessage to " + peerAddress + " from " + myAddress - + " with allPeers=" + peerGroup.getAllPeerAddresses()); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("PeersMessage sent successfully from " + myAddress + " to " + peerAddress); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("PeersMessage sending failed " + throwable.getMessage()); - onFault(throwable); - } - }); - - log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress - + " authenticated (" + connection.objectId + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); - - onSuccess(connection); - } else { - log.warn("verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce); - onFault(new Exception("Verify nonce failed. getPeersMessage=" + getPeersAuthRequest + " / nonce=" + nonce)); - } - } else if (message instanceof GetPeersAuthResponse) { - // Requesting peer - GetPeersAuthResponse getPeersAuthResponse = (GetPeersAuthResponse) message; - Address peerAddress = getPeersAuthResponse.address; - log.trace("PeersMessage from " + peerAddress + " at " + myAddress); - HashSet
peerAddresses = getPeersAuthResponse.peerAddresses; - log.trace("Received peers: " + peerAddresses); - peerGroup.addToReportedPeers(peerAddresses, connection); - - // we wait until the handshake is completed before setting the authenticate flag - // authentication at both sides of the connection - log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress - + " authenticated (" + connection.objectId + "). Took " - + (System.currentTimeMillis() - startAuthTs) + " ms. \n\n"); - - onSuccess(connection); - } - } - }; - - networkNode.addMessageListener(messageListener); - } + /////////////////////////////////////////////////////////////////////////////////////////// + // Private + /////////////////////////////////////////////////////////////////////////////////////////// private void authenticateToNextRandomPeer(Set
remainingAddresses) { Log.traceCall(); @@ -303,4 +300,21 @@ public class AuthenticationHandshake { return nonce; } + private void onFault(@NotNull Throwable throwable) { + Log.traceCall(); + cleanup(); + resultFuture.setException(throwable); + } + + private void onSuccess(Connection connection) { + Log.traceCall(); + cleanup(); + resultFuture.set(connection); + } + + private void cleanup() { + Log.traceCall(); + stopped = true; + networkNode.removeMessageListener(this); + } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java index 6894208476..e66c96cc3e 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/AuthenticationListener.java @@ -3,6 +3,7 @@ package io.bitsquare.p2p.peers; import io.bitsquare.p2p.Address; import io.bitsquare.p2p.network.Connection; +//TODO used only in unittests yet public abstract class AuthenticationListener implements PeerListener { public void onFirstAuthenticatePeer(Peer peer) { } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java index af868d7688..2dc4334679 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerGroup.java @@ -8,6 +8,7 @@ import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Tuple2; import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.Message; import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.ConnectionListener; import io.bitsquare.p2p.network.MessageListener; @@ -27,7 +28,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; // Run in UserThread -public class PeerGroup { +public class PeerGroup implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); static int simulateAuthTorNode = 0; @@ -70,49 +71,53 @@ public class PeerGroup { this.networkNode = networkNode; this.seedNodeAddresses = seeds; - init(networkNode); - } - - private void init(NetworkNode networkNode) { - Log.traceCall(); - networkNode.addMessageListener((message, connection) -> { - if (message instanceof MaintenanceMessage) - processMaintenanceMessage((MaintenanceMessage) message, connection); - else if (message instanceof AuthenticationRequest) { - processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection); - } - }); - - networkNode.addConnectionListener(new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - Log.traceCall(); - } - - @Override - public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { - Log.traceCall(); - } - - @Override - public void onDisconnect(Reason reason, Connection connection) { - Log.traceCall(); - log.debug("onDisconnect connection=" + connection + " / reason=" + reason); - // only removes authenticated nodes - if (connection.isAuthenticated()) - removePeer(connection.getPeerAddress()); - } - - @Override - public void onError(Throwable throwable) { - Log.traceCall(); - } - }); + networkNode.addMessageListener(this); + networkNode.addConnectionListener(this); setupMaintenanceTimer(); } + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + Log.traceCall(); + if (message instanceof MaintenanceMessage) + processMaintenanceMessage((MaintenanceMessage) message, connection); + else if (message instanceof AuthenticationRequest) { + processAuthenticationRequest(networkNode, (AuthenticationRequest) message, connection); + } + } + + + /////////////////////////////////////////////////////////////////////////////////////////// + // ConnectionListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onConnection(Connection connection) { + } + + @Override + public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) { + } + + @Override + public void onDisconnect(Reason reason, Connection connection) { + log.debug("onDisconnect connection=" + connection + " / reason=" + reason); + // only removes authenticated nodes + if (connection.isAuthenticated()) + removePeer(connection.getPeerAddress()); + } + + @Override + public void onError(Throwable throwable) { + } + + /////////////////////////////////////////////////////////////////////////////////////////// // API /////////////////////////////////////////////////////////////////////////////////////////// @@ -122,6 +127,35 @@ public class PeerGroup { seedNodeAddresses.remove(mySeedNodeAddress); } + public void broadcast(BroadcastMessage message, @Nullable Address sender) { + Log.traceCall("Sender " + sender + ". Message " + message.toString()); + if (authenticatedPeers.values().size() > 0) { + log.info("Broadcast message to {} peers. Message:", authenticatedPeers.values().size(), message); + // TODO add randomized timing? + authenticatedPeers.values().stream() + .filter(e -> !e.address.equals(sender)) + .forEach(peer -> { + log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + "."); + SettableFuture future = networkNode.sendMessage(peer.address, message); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Connection connection) { + log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded."); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + log.info("Broadcast failed. " + throwable.getMessage()); + removePeer(peer.address); + } + }); + }); + } else { + log.trace("Message {} not broadcasted because we are not authenticated yet. " + + "That is expected at startup.", message); + } + } + public void shutDown() { Log.traceCall(); if (!shutDownInProgress) { @@ -131,32 +165,6 @@ public class PeerGroup { } } - public void broadcast(BroadcastMessage message, @Nullable Address sender) { - Log.traceCall(); - log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers."); - log.trace("message = " + message); - - // TODO add randomized timing? - authenticatedPeers.values().stream() - .filter(e -> !e.address.equals(sender)) - .forEach(peer -> { - log.trace("Broadcast message from " + getMyAddress() + " to " + peer.address + "."); - SettableFuture future = networkNode.sendMessage(peer.address, message); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Connection connection) { - log.trace("Broadcast from " + getMyAddress() + " to " + peer.address + " succeeded."); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - log.info("Broadcast failed. " + throwable.getMessage()); - removePeer(peer.address); - } - }); - }); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Authentication to seed node @@ -536,16 +544,6 @@ public class PeerGroup { // Listeners /////////////////////////////////////////////////////////////////////////////////////////// - public void addMessageListener(MessageListener messageListener) { - Log.traceCall(); - networkNode.addMessageListener(messageListener); - } - - public void removeMessageListener(MessageListener messageListener) { - Log.traceCall(); - networkNode.removeMessageListener(messageListener); - } - public void addPeerListener(PeerListener peerListener) { Log.traceCall(); peerListeners.add(peerListener); @@ -579,6 +577,10 @@ public class PeerGroup { return seedNodeAddresses; } + public NetworkNode getNetworkNode() { + return networkNode; + } + /////////////////////////////////////////////////////////////////////////////////////////// // Reported peers @@ -675,19 +677,20 @@ public class PeerGroup { public void printAuthenticatedPeers() { Log.traceCall(); - StringBuilder result = new StringBuilder("\n\n############################################################\n" + + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + "Authenticated peers for node " + getMyAddress() + ":"); authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); - result.append("\n############################################################\n"); + result.append("\n------------------------------------------------------------\n"); log.info(result.toString()); } public void printReportedPeers() { Log.traceCall(); - StringBuilder result = new StringBuilder("\n\n############################################################\n" + + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + "Reported peers for node " + getMyAddress() + ":"); reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e)); - result.append("\n############################################################\n"); + result.append("\n------------------------------------------------------------\n"); log.info(result.toString()); } + } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java index 1d9e5a744d..378f55c961 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/ProtectedExpirableDataStorage.java @@ -8,8 +8,11 @@ import io.bitsquare.common.crypto.Hash; import io.bitsquare.common.crypto.Sig; import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Address; +import io.bitsquare.p2p.Message; +import io.bitsquare.p2p.network.Connection; import io.bitsquare.p2p.network.IllegalRequest; import io.bitsquare.p2p.network.MessageListener; +import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.peers.PeerGroup; import io.bitsquare.p2p.storage.data.*; import io.bitsquare.p2p.storage.messages.*; @@ -29,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; // Run in UserThread -public class ProtectedExpirableDataStorage { +public class ProtectedExpirableDataStorage implements MessageListener { private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class); @VisibleForTesting @@ -40,7 +43,6 @@ public class ProtectedExpirableDataStorage { private final CopyOnWriteArraySet hashMapChangedListeners = new CopyOnWriteArraySet<>(); private ConcurrentHashMap sequenceNumberMap = new ConcurrentHashMap<>(); private final Storage storage; - private boolean authenticated; private final Timer timer = new Timer(); private volatile boolean shutDownInProgress; @@ -65,25 +67,8 @@ public class ProtectedExpirableDataStorage { sequenceNumberMap = persisted; } - addMessageListener((message, connection) -> { - Log.traceCall("onMessage: Message=" + message); - if (message instanceof DataMessage) { - if (connection.isAuthenticated()) { - log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection); - if (message instanceof AddDataMessage) { - add(((AddDataMessage) message).data, connection.getPeerAddress()); - } else if (message instanceof RemoveDataMessage) { - remove(((RemoveDataMessage) message).data, connection.getPeerAddress()); - } else if (message instanceof RemoveMailboxDataMessage) { - removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress()); - } - } else { - log.warn("Connection is not authenticated yet. We don't accept storage operations form non-authenticated nodes."); - log.warn("Connection = " + connection); - connection.reportIllegalRequest(IllegalRequest.NotAuthenticated); - } - } - }); + NetworkNode networkNode = peerGroup.getNetworkNode(); + networkNode.addMessageListener(this); timer.scheduleAtFixedRate(new TimerTask() { @Override @@ -107,6 +92,31 @@ public class ProtectedExpirableDataStorage { .forEach(entry -> map.remove(entry.getKey())); } + /////////////////////////////////////////////////////////////////////////////////////////// + // MessageListener implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void onMessage(Message message, Connection connection) { + Log.traceCall("Message=" + message); + if (message instanceof DataMessage) { + if (connection.isAuthenticated()) { + log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection); + if (message instanceof AddDataMessage) { + add(((AddDataMessage) message).data, connection.getPeerAddress()); + } else if (message instanceof RemoveDataMessage) { + remove(((RemoveDataMessage) message).data, connection.getPeerAddress()); + } else if (message instanceof RemoveMailboxDataMessage) { + removeMailboxData(((RemoveMailboxDataMessage) message).data, connection.getPeerAddress()); + } + } else { + log.warn("Connection is not authenticated yet. We don't accept storage operations form non-authenticated nodes."); + log.warn("Connection = " + connection); + connection.reportIllegalRequest(IllegalRequest.NotAuthenticated); + } + } + } + /////////////////////////////////////////////////////////////////////////////////////////// // API @@ -121,11 +131,6 @@ public class ProtectedExpirableDataStorage { } } - public void setAuthenticated() { - Log.traceCall(); - this.authenticated = true; - } - public boolean add(ProtectedData protectedData, @Nullable Address sender) { Log.traceCall(); BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload); @@ -143,10 +148,10 @@ public class ProtectedExpirableDataStorage { log.trace("Data added to our map and it will be broadcasted to our peers."); hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)); - StringBuilder sb = new StringBuilder("\n\n############################################################\n"); + StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); sb.append("Data set after addProtectedExpirableData:"); map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n")); - sb.append("\n############################################################\n"); + sb.append("\n------------------------------------------------------------\n"); log.info(sb.toString()); if (!containsKey) @@ -211,7 +216,6 @@ public class ProtectedExpirableDataStorage { } public Map getMap() { - //Log.traceCall(); return map; } @@ -252,11 +256,6 @@ public class ProtectedExpirableDataStorage { hashMapChangedListeners.add(hashMapChangedListener); } - private void addMessageListener(MessageListener messageListener) { - Log.traceCall(); - peerGroup.addMessageListener(messageListener); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Private @@ -268,11 +267,11 @@ public class ProtectedExpirableDataStorage { log.trace("Data removed from our map. We broadcast the message to our peers."); hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)); - StringBuilder sb = new StringBuilder("\n\n############################################################\n" + + StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" + "Data set after removeProtectedExpirableData:"); map.values().stream().forEach(e -> sb.append("\n").append(e.toString())); - sb.append("\n############################################################\n"); - log.trace(sb.toString()); + sb.append("\n------------------------------------------------------------\n"); + log.info(sb.toString()); } private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) { @@ -352,17 +351,12 @@ public class ProtectedExpirableDataStorage { } private void broadcast(BroadcastMessage message, @Nullable Address sender) { - Log.traceCall(); - if (authenticated) { - peerGroup.broadcast(message, sender); - log.trace("Broadcast message " + message); - } else { - log.trace("Broadcast not allowed because we are not authenticated yet. That is normal after received AllDataMessage at startup."); - } + Log.traceCall(message.toString()); + peerGroup.broadcast(message, sender); } private BigInteger getHashAsBigInteger(ExpirablePayload payload) { - //Log.traceCall(); return new BigInteger(Hash.getHash(payload)); } + } diff --git a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java index fe29e5b2aa..48340b8cfc 100644 --- a/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java +++ b/network/src/test/java/io/bitsquare/p2p/P2PServiceTest.java @@ -282,7 +282,7 @@ public class P2PServiceTest { // send to online peer CountDownLatch latch2 = new CountDownLatch(2); MockMailboxMessage mockMessage = new MockMailboxMessage("MockMailboxMessage", p2PService2.getAddress()); - p2PService2.addMessageListener((message, connection) -> { + p2PService2.getNetworkNode().addMessageListener((message, connection) -> { log.trace("message " + message); if (message instanceof SealedAndSignedMessage) { try {