From 991ab56a970537fd940e34caf4fe0acd1a7ca778 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Tue, 26 Jan 2016 20:56:16 +0100 Subject: [PATCH] Cleanup, apply code inspection suggestions --- .../bitsquare/crypto/EncryptionService.java | 4 - .../p2p/NetworkNotReadyException.java | 2 +- .../main/java/io/bitsquare/p2p/P2PModule.java | 3 - .../java/io/bitsquare/p2p/P2PService.java | 16 ++-- .../src/main/java/io/bitsquare/p2p/Utils.java | 9 +- .../io/bitsquare/p2p/network/Connection.java | 23 +++-- .../io/bitsquare/p2p/network/NetworkNode.java | 24 ++--- .../bitsquare/p2p/network/SetupListener.java | 1 + .../messages/CloseConnectionMessage.java | 2 - .../java/io/bitsquare/p2p/peers/Peer.java | 47 ---------- .../p2p/peers/PeerExchangeManager.java | 36 +++++--- .../io/bitsquare/p2p/peers/PeerManager.java | 91 ++++++++----------- .../p2p/peers/RequestDataManager.java | 23 +++-- .../peers/messages/peers/GetPeersRequest.java | 2 +- .../java/io/bitsquare/p2p/seed/SeedNode.java | 2 +- .../bitsquare/p2p/storage/P2PDataStorage.java | 2 +- .../p2p/storage/data/DataAndSeqNr.java | 25 +---- 17 files changed, 117 insertions(+), 195 deletions(-) delete mode 100644 network/src/main/java/io/bitsquare/p2p/peers/Peer.java diff --git a/network/src/main/java/io/bitsquare/crypto/EncryptionService.java b/network/src/main/java/io/bitsquare/crypto/EncryptionService.java index c53556ca97..f194b4e1aa 100644 --- a/network/src/main/java/io/bitsquare/crypto/EncryptionService.java +++ b/network/src/main/java/io/bitsquare/crypto/EncryptionService.java @@ -20,15 +20,11 @@ package io.bitsquare.crypto; import io.bitsquare.common.crypto.*; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.inject.Inject; import java.security.KeyPair; public class EncryptionService { - private static final Logger log = LoggerFactory.getLogger(EncryptionService.class); - private final KeyRing keyRing; @Inject diff --git a/network/src/main/java/io/bitsquare/p2p/NetworkNotReadyException.java b/network/src/main/java/io/bitsquare/p2p/NetworkNotReadyException.java index 96758522c1..ae2d647def 100644 --- a/network/src/main/java/io/bitsquare/p2p/NetworkNotReadyException.java +++ b/network/src/main/java/io/bitsquare/p2p/NetworkNotReadyException.java @@ -1,6 +1,6 @@ package io.bitsquare.p2p; -public class NetworkNotReadyException extends RuntimeException { +class NetworkNotReadyException extends RuntimeException { public NetworkNotReadyException() { super("You must have bootstrapped before adding data to the P2P network."); diff --git a/network/src/main/java/io/bitsquare/p2p/P2PModule.java b/network/src/main/java/io/bitsquare/p2p/P2PModule.java index bb7aa66b2f..3818da82b1 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PModule.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PModule.java @@ -22,8 +22,6 @@ import com.google.inject.name.Names; import io.bitsquare.app.AppModule; import io.bitsquare.app.ProgramArguments; import io.bitsquare.p2p.seed.SeedNodesRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.core.env.Environment; import java.io.File; @@ -32,7 +30,6 @@ import static com.google.inject.name.Names.named; public class P2PModule extends AppModule { - private static final Logger log = LoggerFactory.getLogger(P2PModule.class); public P2PModule(Environment env) { super(env); diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 884331deff..346d8b105e 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -48,11 +48,11 @@ import static com.google.common.base.Preconditions.checkNotNull; public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener, HashMapChangedListener { private static final Logger log = LoggerFactory.getLogger(P2PService.class); - protected final SeedNodesRepository seedNodesRepository; - protected final int port; - protected final File torDir; - protected final Optional optionalEncryptionService; - protected final Optional optionalKeyRing; + private final SeedNodesRepository seedNodesRepository; + private final int port; + private final File torDir; + private final Optional optionalEncryptionService; + private final Optional optionalKeyRing; // set in init private NetworkNode networkNode; @@ -102,7 +102,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis init(useLocalhost, networkId, storageDir); } - protected void init(boolean useLocalhost, int networkId, File storageDir) { + private void init(boolean useLocalhost, int networkId, File storageDir) { Log.traceCall(); connectionNodeAddressListener = (observable, oldValue, newValue) -> { @@ -633,10 +633,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis decryptedMailboxListeners.add(listener); } - public void removeDecryptedMailboxListener(DecryptedMailboxListener listener) { - decryptedMailboxListeners.remove(listener); - } - public void addP2PServiceListener(P2PServiceListener listener) { p2pServiceListeners.add(listener); } diff --git a/network/src/main/java/io/bitsquare/p2p/Utils.java b/network/src/main/java/io/bitsquare/p2p/Utils.java index 6a7ac1c1dd..69d8b50675 100644 --- a/network/src/main/java/io/bitsquare/p2p/Utils.java +++ b/network/src/main/java/io/bitsquare/p2p/Utils.java @@ -1,8 +1,6 @@ package io.bitsquare.p2p; import io.bitsquare.common.ByteArrayUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -14,7 +12,6 @@ import java.util.zip.Deflater; import java.util.zip.Inflater; public class Utils { - private static final Logger log = LoggerFactory.getLogger(Utils.class); public static int findFreeSystemPort() { try { @@ -49,9 +46,9 @@ public class Utils { return bos.toByteArray(); } - private static byte[] decompress(byte[] compressedData, int offset, int length) { + private static byte[] decompress(byte[] compressedData, int length) { Inflater inflater = new Inflater(); - inflater.setInput(compressedData, offset, length); + inflater.setInput(compressedData, 0, length); ByteArrayOutputStream bos = new ByteArrayOutputStream(length); byte[] buf = new byte[8192]; while (!inflater.finished()) { @@ -73,7 +70,7 @@ public class Utils { } public static Serializable decompress(byte[] compressedData) { - return (Serializable) ByteArrayUtils.byteArrayToObject(decompress(compressedData, 0, compressedData.length)); + return (Serializable) ByteArrayUtils.byteArrayToObject(decompress(compressedData, compressedData.length)); } } diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 5b26977da9..c494643ed3 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -82,15 +82,15 @@ public class Connection implements MessageListener { // use GZIPInputStream but problems with blocking private final boolean useCompression = false; private PeerType peerType; - private ObjectProperty nodeAddressProperty = new SimpleObjectProperty<>(); + private final ObjectProperty nodeAddressProperty = new SimpleObjectProperty<>(); /////////////////////////////////////////////////////////////////////////////////////////// // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, - @Nullable NodeAddress peersNodeAddress) { + Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, + @Nullable NodeAddress peersNodeAddress) { Log.traceCall(); this.socket = socket; this.messageListener = messageListener; @@ -168,10 +168,11 @@ public class Connection implements MessageListener { } Object objectToWrite; + //noinspection ConstantConditions if (useCompression) { byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message); // log.trace("Write object uncompressed data size: " + messageAsBytes.length); - byte[] compressed = Utils.compress(message); + @SuppressWarnings("UnnecessaryLocalVariable") byte[] compressed = Utils.compress(message); //log.trace("Write object compressed data size: " + compressed.length); objectToWrite = compressed; } else { @@ -194,6 +195,7 @@ public class Connection implements MessageListener { } } + @SuppressWarnings("unused") public void reportIllegalRequest(IllegalRequest illegalRequest) { Log.traceCall(); sharedModel.reportIllegalRequest(illegalRequest); @@ -221,7 +223,7 @@ public class Connection implements MessageListener { this.peerType = peerType; } - public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) { + private synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) { Log.traceCall(peerNodeAddress.toString()); checkNotNull(peerNodeAddress, "peerAddress must not be null"); peersNodeAddressOptional = Optional.of(peerNodeAddress); @@ -390,6 +392,7 @@ public class Connection implements MessageListener { '}'; } + @SuppressWarnings("unused") public String printDetails() { return "Connection{" + "peerAddress=" + peersNodeAddressOptional + @@ -458,7 +461,7 @@ public class Connection implements MessageListener { "illegalRequest={}\n" + "illegalRequests={}", violations, illegalRequest, illegalRequests.toString()); log.debug("connection={}" + this); - shutDown(false); + shutDown(); } else { illegalRequests.put(illegalRequest, ++violations); } @@ -486,14 +489,14 @@ public class Connection implements MessageListener { e.printStackTrace(); } - shutDown(false); + shutDown(); } - public void shutDown(boolean sendCloseConnectionMessage) { + public void shutDown() { Log.traceCall(); if (!stopped) { stopped = true; - connection.shutDown(sendCloseConnectionMessage); + connection.shutDown(false); } } @@ -617,7 +620,7 @@ public class Connection implements MessageListener { if (message instanceof CloseConnectionMessage) { log.info("CloseConnectionMessage received on connection {}", connection); stopped = true; - sharedModel.shutDown(false); + sharedModel.shutDown(); } else if (!stopped) { // First a seed node gets a message form a peer (PreliminaryDataRequest using // AnonymousMessage interface) which does not has its hidden service diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index b8c2435dda..250e39292a 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -19,7 +19,6 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; @@ -28,15 +27,13 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class NetworkNode implements MessageListener, ConnectionListener { private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); - private static final int CREATE_SOCKET_TIMEOUT = 10 * 1000; // 10 sec. - - protected final int servicePort; + final int servicePort; private final CopyOnWriteArraySet inBoundConnections = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet connectionListeners = new CopyOnWriteArraySet<>(); - protected final CopyOnWriteArraySet setupListeners = new CopyOnWriteArraySet<>(); - protected ListeningExecutorService executorService; + final CopyOnWriteArraySet setupListeners = new CopyOnWriteArraySet<>(); + ListeningExecutorService executorService; private Server server; private volatile boolean shutDownInProgress; @@ -48,7 +45,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - public NetworkNode(int servicePort) { + NetworkNode(int servicePort) { Log.traceCall(); this.servicePort = servicePort; } @@ -57,11 +54,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // API /////////////////////////////////////////////////////////////////////////////////////////// - public void start() { - Log.traceCall(); - start(null); - } - abstract public void start(@Nullable SetupListener setupListener); public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddress, Message message) { @@ -114,7 +106,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener outboundConnection.sendMessage(message); return outboundConnection; } catch (Throwable throwable) { - if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { + if (!(throwable instanceof ConnectException || throwable instanceof IOException)) { throwable.printStackTrace(); log.error("Executing task failed. " + throwable.getMessage()); } @@ -203,7 +195,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // SetupListener /////////////////////////////////////////////////////////////////////////////////////////// - public void addSetupListener(SetupListener setupListener) { + void addSetupListener(SetupListener setupListener) { Log.traceCall(); boolean isNewEntry = setupListeners.add(setupListener); if (!isNewEntry) @@ -287,12 +279,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener // Protected /////////////////////////////////////////////////////////////////////////////////////////// - protected void createExecutorService() { + void createExecutorService() { Log.traceCall(); executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L); } - protected void startServer(ServerSocket serverSocket) { + void startServer(ServerSocket serverSocket) { Log.traceCall(); ConnectionListener startServerConnectionListener = new ConnectionListener() { @Override diff --git a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java index bb4bc5ef17..2275d76fa6 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java +++ b/network/src/main/java/io/bitsquare/p2p/network/SetupListener.java @@ -5,5 +5,6 @@ public interface SetupListener { void onHiddenServicePublished(); + @SuppressWarnings("unused") void onSetupFailed(Throwable throwable); } diff --git a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java index b5a88d1dff..cf2852336a 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java +++ b/network/src/main/java/io/bitsquare/p2p/network/messages/CloseConnectionMessage.java @@ -2,14 +2,12 @@ package io.bitsquare.p2p.network.messages; import io.bitsquare.app.Version; import io.bitsquare.p2p.Message; -import io.bitsquare.p2p.NodeAddress; public final class CloseConnectionMessage implements Message { // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private final int networkId = Version.getNetworkId(); - public NodeAddress peerNodeAddress; public CloseConnectionMessage() { } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java b/network/src/main/java/io/bitsquare/p2p/peers/Peer.java deleted file mode 100644 index fe0265844b..0000000000 --- a/network/src/main/java/io/bitsquare/p2p/peers/Peer.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.bitsquare.p2p.peers; - -import io.bitsquare.p2p.NodeAddress; -import io.bitsquare.p2p.network.Connection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Random; - -public class Peer { - private static final Logger log = LoggerFactory.getLogger(Peer.class); - - public final Connection connection; - public final NodeAddress nodeAddress; - public final long pingNonce; - - public Peer(Connection connection, NodeAddress nodeAddress) { - this.connection = connection; - this.nodeAddress = nodeAddress; - - pingNonce = new Random().nextLong(); - } - - @Override - public int hashCode() { - return nodeAddress != null ? nodeAddress.hashCode() : 0; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Peer)) return false; - - Peer peer = (Peer) o; - - return !(nodeAddress != null ? !nodeAddress.equals(peer.nodeAddress) : peer.nodeAddress != null); - } - - @Override - public String toString() { - return "Peer{" + - "address=" + nodeAddress + - ", pingNonce=" + pingNonce + - ", connection=" + connection + - '}'; - } -} diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index 59ff6fd6d7..5c0c25708c 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -174,8 +174,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); + stopTimeoutTimer(); - //peerManager.removePeer(nodeAddress); + if (!remainingNodeAddresses.isEmpty()) { log.info("There are remaining nodes available for requesting peers. " + "We will try getReportedPeers again."); @@ -185,8 +186,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener "That is expected if no other node is online.\n" + "We will try to use reported peers (if no available we use persisted peers) " + "and try again to request peers from our seed nodes after a random pause."); - requestReportedPeersAfterDelayTimer = UserThread.runAfter(() -> - continueWithMorePeers(), + requestReportedPeersAfterDelayTimer = UserThread.runAfter(this::continueWithMorePeers, 10, TimeUnit.SECONDS); } } @@ -201,18 +201,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Log.traceCall(); if (!peerManager.hasSufficientConnections()) { // We want to keep it sorted but avoid duplicates - List list = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers().stream() - .filter(e -> !networkNode.getNodeAddressesOfConfirmedConnections().contains(e)) - .collect(Collectors.toSet())); - list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream() - .filter(e -> !list.contains(e) && - !networkNode.getNodeAddressesOfConfirmedConnections().contains(e)) - .collect(Collectors.toSet())); + List list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>())); + list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); list.addAll(seedNodeAddresses.stream() .filter(e -> !list.contains(e) && - !networkNode.getNodeAddressesOfConfirmedConnections().contains(e) && - !e.equals(networkNode.getNodeAddress())) + !peerManager.isSelf(e) && + !peerManager.isConfirmed(e)) .collect(Collectors.toSet())); + log.trace("Sorted and filtered list: list=" + list); if (!list.isEmpty()) { NodeAddress nextCandidate = list.get(0); list.remove(nextCandidate); @@ -225,6 +221,20 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener } } + // sorted by most recent lastActivityDate + private List getFilteredAndSortedList(Set set, List list) { + return set.stream() + .filter(e -> !list.contains(e.nodeAddress) && + !peerManager.isSeedNode(e) && + !peerManager.isSelf(e) && + !peerManager.isConfirmed(e)) + .collect(Collectors.toList()) + .stream() + .sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate)) + .map(e -> e.nodeAddress) + .collect(Collectors.toList()); + } + // we check if we have at least one seed node connected private void checkForSeedNode() { @@ -243,7 +253,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener private HashSet getReportedPeersHashSet(NodeAddress receiverNodeAddress) { return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream() .filter(e -> !peerManager.isSeedNode(e) && - !e.nodeAddress.equals(networkNode.getNodeAddress()) && + !peerManager.isSelf(e) && !e.nodeAddress.equals(receiverNodeAddress) ) .collect(Collectors.toSet())); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 5f8cdfde7f..46211692e9 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -74,12 +74,12 @@ public class PeerManager implements ConnectionListener, MessageListener { }; } - protected void createDbStorage(File storageDir) { + private void createDbStorage(File storageDir) { dbStorage = new Storage<>(storageDir); initPersistedPeers(); } - protected void initPersistedPeers() { + private void initPersistedPeers() { if (dbStorage != null) { HashSet persistedPeers = dbStorage.initAndGetPersisted("persistedPeers"); if (persistedPeers != null) { @@ -148,7 +148,7 @@ public class PeerManager implements ConnectionListener, MessageListener { // Check seed node connections /////////////////////////////////////////////////////////////////////////////////////////// - protected boolean checkMaxConnections(int limit) { + private boolean checkMaxConnections(int limit) { Log.traceCall(); stopCheckMaxConnectionsTimer(); removeSuperfluousSeedNodes(); @@ -205,7 +205,7 @@ public class PeerManager implements ConnectionListener, MessageListener { } } - protected void removeSuperfluousSeedNodes() { + private void removeSuperfluousSeedNodes() { Set allConnections = networkNode.getAllConnections(); if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) { List candidates = allConnections.stream() @@ -228,11 +228,7 @@ public class PeerManager implements ConnectionListener, MessageListener { // Reported peers /////////////////////////////////////////////////////////////////////////////////////////// - public void removeReportedPeer(NodeAddress nodeAddress) { - removeReportedPeer(new ReportedPeer(nodeAddress)); - } - - public void removeReportedPeer(ReportedPeer reportedPeer) { + private void removeReportedPeer(ReportedPeer reportedPeer) { reportedPeers.remove(reportedPeer); printReportedPeers(); } @@ -241,13 +237,6 @@ public class PeerManager implements ConnectionListener, MessageListener { return reportedPeers; } - public Set getNodeAddressesOfReportedPeers() { - return reportedPeers.stream().map(e -> e.nodeAddress) - .filter(e -> !isSeedNode(e) && - !e.equals(networkNode.getNodeAddress())) - .collect(Collectors.toSet()); - } - public void addToReportedPeers(HashSet reportedPeersToAdd, Connection connection) { Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd); // we disconnect misbehaving nodes trying to send too many peers @@ -317,12 +306,22 @@ public class PeerManager implements ConnectionListener, MessageListener { printReportedPeers(); } + private void printReportedPeers() { + if (!reportedPeers.isEmpty()) { + StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + + "Reported peers for node " + networkNode.getNodeAddress() + ":"); + reportedPeers.stream().forEach(e -> result.append("\n").append(e)); + result.append("\n------------------------------------------------------------\n"); + log.info(result.toString()); + } + } + /////////////////////////////////////////////////////////////////////////////////////////// // Persisted peers /////////////////////////////////////////////////////////////////////////////////////////// - public void removeFromPersistedPeers(ReportedPeer reportedPeer) { + private void removeFromPersistedPeers(ReportedPeer reportedPeer) { if (persistedPeers.contains(reportedPeer)) { persistedPeers.remove(reportedPeer); @@ -331,21 +330,10 @@ public class PeerManager implements ConnectionListener, MessageListener { } } - public void removeFromPersistedPeers(NodeAddress peerNodeAddress) { - removeFromPersistedPeers(new ReportedPeer(peerNodeAddress)); - } - - public HashSet getPersistedPeers() { + public Set getPersistedPeers() { return persistedPeers; } - public Set getNodeAddressesOfPersistedPeers() { - return persistedPeers.stream().map(e -> e.nodeAddress) - .filter(e -> !isSeedNode(e) && - !e.equals(networkNode.getNodeAddress())) - .collect(Collectors.toSet()); - } - /////////////////////////////////////////////////////////////////////////////////////////// // Misc @@ -355,25 +343,12 @@ public class PeerManager implements ConnectionListener, MessageListener { return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; } - public void removePeer(NodeAddress nodeAddress) { - removeReportedPeer(nodeAddress); - removeFromPersistedPeers(nodeAddress); - } - public Set getConnectedAndReportedPeers() { Set result = new HashSet<>(reportedPeers); result.addAll(getConnectedPeers()); return result; } - public Set getConnectedPeers() { - // networkNode.getConfirmedConnections includes: - // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) - return networkNode.getConfirmedConnections().stream() - .map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate())) - .collect(Collectors.toSet()); - } - public boolean isSeedNode(ReportedPeer reportedPeer) { return seedNodeAddresses.contains(reportedPeer.nodeAddress); } @@ -386,6 +361,22 @@ public class PeerManager implements ConnectionListener, MessageListener { return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get()); } + public boolean isSelf(ReportedPeer reportedPeer) { + return isSelf(reportedPeer.nodeAddress); + } + + public boolean isSelf(NodeAddress nodeAddress) { + return nodeAddress.equals(networkNode.getNodeAddress()); + } + + public boolean isConfirmed(ReportedPeer reportedPeer) { + return isConfirmed(reportedPeer.nodeAddress); + } + + public boolean isConfirmed(NodeAddress nodeAddress) { + return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Private @@ -415,6 +406,13 @@ public class PeerManager implements ConnectionListener, MessageListener { return list.remove(new Random().nextInt(list.size())); } + private Set getConnectedPeers() { + // networkNode.getConfirmedConnections includes: + // filter(connection -> connection.getPeersNodeAddressOptional().isPresent()) + return networkNode.getConfirmedConnections().stream() + .map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate())) + .collect(Collectors.toSet()); + } private void stopCheckMaxConnectionsTimer() { if (checkMaxConnectionsTimer != null) { @@ -433,13 +431,4 @@ public class PeerManager implements ConnectionListener, MessageListener { } } - private void printReportedPeers() { - if (!reportedPeers.isEmpty()) { - StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + - "Reported peers for node " + networkNode.getNodeAddress() + ":"); - reportedPeers.stream().forEach(e -> result.append("\n").append(e)); - result.append("\n------------------------------------------------------------\n"); - log.info(result.toString()); - } - } } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index 820601f87b..eb3f18b4c3 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -195,7 +195,6 @@ public class RequestDataManager implements MessageListener { private void handleError(NodeAddress nodeAddress, List remainingNodeAddresses) { Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); stopTimeoutTimer(); - //peerManager.removePeer(nodeAddress); if (!remainingNodeAddresses.isEmpty()) { log.info("There are remaining nodes available for requesting data. " + @@ -221,12 +220,9 @@ public class RequestDataManager implements MessageListener { // we got from the other seed node contacted but we still have not requested the initial // data set List list = new ArrayList<>(seedNodeAddresses); - list.addAll(peerManager.getNodeAddressesOfReportedPeers().stream() - .filter(e -> !list.contains(e)) - .collect(Collectors.toSet())); - list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream() - .filter(e -> !list.contains(e)) - .collect(Collectors.toSet())); + list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list)); + list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list)); + log.trace("Sorted and filtered list: list=" + list); if (!list.isEmpty()) { NodeAddress nextCandidate = list.get(0); list.remove(nextCandidate); @@ -241,6 +237,19 @@ public class RequestDataManager implements MessageListener { } } + // sorted by most recent lastActivityDate + private List getFilteredAndSortedList(Set set, List list) { + return set.stream() + .filter(e -> !list.contains(e.nodeAddress) && + !peerManager.isSeedNode(e) && + !peerManager.isSelf(e.nodeAddress)) + .collect(Collectors.toList()) + .stream() + .sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate)) + .map(e -> e.nodeAddress) + .collect(Collectors.toList()); + } + private void stopRequestDataTimer() { if (requestDataAfterDelayTimer != null) { requestDataAfterDelayTimer.cancel(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java index cd8fe090d4..667eedc7b9 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/messages/peers/GetPeersRequest.java @@ -11,7 +11,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender // That object is sent over the wire, so we need to take care of version compatibility. private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; - private NodeAddress senderNodeAddress; + private final NodeAddress senderNodeAddress; public final HashSet reportedPeers; public GetPeersRequest(NodeAddress senderNodeAddress, HashSet reportedPeers) { diff --git a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java index 2e7470da60..a208c1cdf3 100644 --- a/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java +++ b/network/src/main/java/io/bitsquare/p2p/seed/SeedNode.java @@ -145,7 +145,7 @@ public class SeedNode { return seedNodeP2PService; } - public void shutDown() { + private void shutDown() { Log.traceCall(); shutDown(null); } diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 985bcb1e21..1d6bd0a130 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -45,7 +45,7 @@ public class P2PDataStorage implements MessageListener { private final CopyOnWriteArraySet hashMapChangedListeners = new CopyOnWriteArraySet<>(); private HashMap sequenceNumberMap = new HashMap<>(); private final Storage storage; - protected final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor; + private final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor diff --git a/network/src/main/java/io/bitsquare/p2p/storage/data/DataAndSeqNr.java b/network/src/main/java/io/bitsquare/p2p/storage/data/DataAndSeqNr.java index ae70b7c0ab..5d5b210c2a 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/data/DataAndSeqNr.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/data/DataAndSeqNr.java @@ -3,31 +3,12 @@ package io.bitsquare.p2p.storage.data; import java.io.Serializable; public class DataAndSeqNr implements Serializable { - public final Serializable data; - public final int sequenceNumber; + // data are only used for getting cryptographic hash from both values + private final Serializable data; + private final int sequenceNumber; public DataAndSeqNr(Serializable data, int sequenceNumber) { this.data = data; this.sequenceNumber = sequenceNumber; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof DataAndSeqNr)) return false; - - DataAndSeqNr that = (DataAndSeqNr) o; - - //noinspection SimplifiableIfStatement - if (sequenceNumber != that.sequenceNumber) return false; - return !(data != null ? !data.equals(that.data) : that.data != null); - - } - - @Override - public int hashCode() { - int result = data != null ? data.hashCode() : 0; - result = 31 * result + sequenceNumber; - return result; - } }