Cleanup, apply code inspection suggestions

This commit is contained in:
Manfred Karrer 2016-01-26 20:56:16 +01:00
parent 991a4350ac
commit 991ab56a97
17 changed files with 117 additions and 195 deletions

View file

@ -20,15 +20,11 @@ package io.bitsquare.crypto;
import io.bitsquare.common.crypto.*; import io.bitsquare.common.crypto.*;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey; import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import java.security.KeyPair; import java.security.KeyPair;
public class EncryptionService { public class EncryptionService {
private static final Logger log = LoggerFactory.getLogger(EncryptionService.class);
private final KeyRing keyRing; private final KeyRing keyRing;
@Inject @Inject

View file

@ -1,6 +1,6 @@
package io.bitsquare.p2p; package io.bitsquare.p2p;
public class NetworkNotReadyException extends RuntimeException { class NetworkNotReadyException extends RuntimeException {
public NetworkNotReadyException() { public NetworkNotReadyException() {
super("You must have bootstrapped before adding data to the P2P network."); super("You must have bootstrapped before adding data to the P2P network.");

View file

@ -22,8 +22,6 @@ import com.google.inject.name.Names;
import io.bitsquare.app.AppModule; import io.bitsquare.app.AppModule;
import io.bitsquare.app.ProgramArguments; import io.bitsquare.app.ProgramArguments;
import io.bitsquare.p2p.seed.SeedNodesRepository; import io.bitsquare.p2p.seed.SeedNodesRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import java.io.File; import java.io.File;
@ -32,7 +30,6 @@ import static com.google.inject.name.Names.named;
public class P2PModule extends AppModule { public class P2PModule extends AppModule {
private static final Logger log = LoggerFactory.getLogger(P2PModule.class);
public P2PModule(Environment env) { public P2PModule(Environment env) {
super(env); super(env);

View file

@ -48,11 +48,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener, HashMapChangedListener { public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener, HashMapChangedListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class); private static final Logger log = LoggerFactory.getLogger(P2PService.class);
protected final SeedNodesRepository seedNodesRepository; private final SeedNodesRepository seedNodesRepository;
protected final int port; private final int port;
protected final File torDir; private final File torDir;
protected final Optional<EncryptionService> optionalEncryptionService; private final Optional<EncryptionService> optionalEncryptionService;
protected final Optional<KeyRing> optionalKeyRing; private final Optional<KeyRing> optionalKeyRing;
// set in init // set in init
private NetworkNode networkNode; private NetworkNode networkNode;
@ -102,7 +102,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
init(useLocalhost, networkId, storageDir); init(useLocalhost, networkId, storageDir);
} }
protected void init(boolean useLocalhost, int networkId, File storageDir) { private void init(boolean useLocalhost, int networkId, File storageDir) {
Log.traceCall(); Log.traceCall();
connectionNodeAddressListener = (observable, oldValue, newValue) -> { connectionNodeAddressListener = (observable, oldValue, newValue) -> {
@ -633,10 +633,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
decryptedMailboxListeners.add(listener); decryptedMailboxListeners.add(listener);
} }
public void removeDecryptedMailboxListener(DecryptedMailboxListener listener) {
decryptedMailboxListeners.remove(listener);
}
public void addP2PServiceListener(P2PServiceListener listener) { public void addP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.add(listener); p2pServiceListeners.add(listener);
} }

View file

@ -1,8 +1,6 @@
package io.bitsquare.p2p; package io.bitsquare.p2p;
import io.bitsquare.common.ByteArrayUtils; import io.bitsquare.common.ByteArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -14,7 +12,6 @@ import java.util.zip.Deflater;
import java.util.zip.Inflater; import java.util.zip.Inflater;
public class Utils { public class Utils {
private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static int findFreeSystemPort() { public static int findFreeSystemPort() {
try { try {
@ -49,9 +46,9 @@ public class Utils {
return bos.toByteArray(); return bos.toByteArray();
} }
private static byte[] decompress(byte[] compressedData, int offset, int length) { private static byte[] decompress(byte[] compressedData, int length) {
Inflater inflater = new Inflater(); Inflater inflater = new Inflater();
inflater.setInput(compressedData, offset, length); inflater.setInput(compressedData, 0, length);
ByteArrayOutputStream bos = new ByteArrayOutputStream(length); ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
byte[] buf = new byte[8192]; byte[] buf = new byte[8192];
while (!inflater.finished()) { while (!inflater.finished()) {
@ -73,7 +70,7 @@ public class Utils {
} }
public static Serializable decompress(byte[] compressedData) { public static Serializable decompress(byte[] compressedData) {
return (Serializable) ByteArrayUtils.byteArrayToObject(decompress(compressedData, 0, compressedData.length)); return (Serializable) ByteArrayUtils.byteArrayToObject(decompress(compressedData, compressedData.length));
} }
} }

View file

@ -82,15 +82,15 @@ public class Connection implements MessageListener {
// use GZIPInputStream but problems with blocking // use GZIPInputStream but problems with blocking
private final boolean useCompression = false; private final boolean useCompression = false;
private PeerType peerType; private PeerType peerType;
private ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>(); private final ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>();
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress) { @Nullable NodeAddress peersNodeAddress) {
Log.traceCall(); Log.traceCall();
this.socket = socket; this.socket = socket;
this.messageListener = messageListener; this.messageListener = messageListener;
@ -168,10 +168,11 @@ public class Connection implements MessageListener {
} }
Object objectToWrite; Object objectToWrite;
//noinspection ConstantConditions
if (useCompression) { if (useCompression) {
byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message); byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message);
// log.trace("Write object uncompressed data size: " + messageAsBytes.length); // log.trace("Write object uncompressed data size: " + messageAsBytes.length);
byte[] compressed = Utils.compress(message); @SuppressWarnings("UnnecessaryLocalVariable") byte[] compressed = Utils.compress(message);
//log.trace("Write object compressed data size: " + compressed.length); //log.trace("Write object compressed data size: " + compressed.length);
objectToWrite = compressed; objectToWrite = compressed;
} else { } else {
@ -194,6 +195,7 @@ public class Connection implements MessageListener {
} }
} }
@SuppressWarnings("unused")
public void reportIllegalRequest(IllegalRequest illegalRequest) { public void reportIllegalRequest(IllegalRequest illegalRequest) {
Log.traceCall(); Log.traceCall();
sharedModel.reportIllegalRequest(illegalRequest); sharedModel.reportIllegalRequest(illegalRequest);
@ -221,7 +223,7 @@ public class Connection implements MessageListener {
this.peerType = peerType; this.peerType = peerType;
} }
public synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) { private synchronized void setPeersNodeAddress(NodeAddress peerNodeAddress) {
Log.traceCall(peerNodeAddress.toString()); Log.traceCall(peerNodeAddress.toString());
checkNotNull(peerNodeAddress, "peerAddress must not be null"); checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress); peersNodeAddressOptional = Optional.of(peerNodeAddress);
@ -390,6 +392,7 @@ public class Connection implements MessageListener {
'}'; '}';
} }
@SuppressWarnings("unused")
public String printDetails() { public String printDetails() {
return "Connection{" + return "Connection{" +
"peerAddress=" + peersNodeAddressOptional + "peerAddress=" + peersNodeAddressOptional +
@ -458,7 +461,7 @@ public class Connection implements MessageListener {
"illegalRequest={}\n" + "illegalRequest={}\n" +
"illegalRequests={}", violations, illegalRequest, illegalRequests.toString()); "illegalRequests={}", violations, illegalRequest, illegalRequests.toString());
log.debug("connection={}" + this); log.debug("connection={}" + this);
shutDown(false); shutDown();
} else { } else {
illegalRequests.put(illegalRequest, ++violations); illegalRequests.put(illegalRequest, ++violations);
} }
@ -486,14 +489,14 @@ public class Connection implements MessageListener {
e.printStackTrace(); e.printStackTrace();
} }
shutDown(false); shutDown();
} }
public void shutDown(boolean sendCloseConnectionMessage) { public void shutDown() {
Log.traceCall(); Log.traceCall();
if (!stopped) { if (!stopped) {
stopped = true; stopped = true;
connection.shutDown(sendCloseConnectionMessage); connection.shutDown(false);
} }
} }
@ -617,7 +620,7 @@ public class Connection implements MessageListener {
if (message instanceof CloseConnectionMessage) { if (message instanceof CloseConnectionMessage) {
log.info("CloseConnectionMessage received on connection {}", connection); log.info("CloseConnectionMessage received on connection {}", connection);
stopped = true; stopped = true;
sharedModel.shutDown(false); sharedModel.shutDown();
} else if (!stopped) { } else if (!stopped) {
// First a seed node gets a message form a peer (PreliminaryDataRequest using // First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service // AnonymousMessage interface) which does not has its hidden service

View file

@ -19,7 +19,6 @@ import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@ -28,15 +27,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
public abstract class NetworkNode implements MessageListener, ConnectionListener { public abstract class NetworkNode implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
private static final int CREATE_SOCKET_TIMEOUT = 10 * 1000; // 10 sec. final int servicePort;
protected final int servicePort;
private final CopyOnWriteArraySet<Connection> inBoundConnections = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<Connection> inBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
protected final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>(); final CopyOnWriteArraySet<SetupListener> setupListeners = new CopyOnWriteArraySet<>();
protected ListeningExecutorService executorService; ListeningExecutorService executorService;
private Server server; private Server server;
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
@ -48,7 +45,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public NetworkNode(int servicePort) { NetworkNode(int servicePort) {
Log.traceCall(); Log.traceCall();
this.servicePort = servicePort; this.servicePort = servicePort;
} }
@ -57,11 +54,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
Log.traceCall();
start(null);
}
abstract public void start(@Nullable SetupListener setupListener); abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, Message message) { public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, Message message) {
@ -114,7 +106,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
outboundConnection.sendMessage(message); outboundConnection.sendMessage(message);
return outboundConnection; return outboundConnection;
} catch (Throwable throwable) { } catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException || throwable instanceof TimeoutException)) { if (!(throwable instanceof ConnectException || throwable instanceof IOException)) {
throwable.printStackTrace(); throwable.printStackTrace();
log.error("Executing task failed. " + throwable.getMessage()); log.error("Executing task failed. " + throwable.getMessage());
} }
@ -203,7 +195,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// SetupListener // SetupListener
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void addSetupListener(SetupListener setupListener) { void addSetupListener(SetupListener setupListener) {
Log.traceCall(); Log.traceCall();
boolean isNewEntry = setupListeners.add(setupListener); boolean isNewEntry = setupListeners.add(setupListener);
if (!isNewEntry) if (!isNewEntry)
@ -287,12 +279,12 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// Protected // Protected
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
protected void createExecutorService() { void createExecutorService() {
Log.traceCall(); Log.traceCall();
executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L); executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L);
} }
protected void startServer(ServerSocket serverSocket) { void startServer(ServerSocket serverSocket) {
Log.traceCall(); Log.traceCall();
ConnectionListener startServerConnectionListener = new ConnectionListener() { ConnectionListener startServerConnectionListener = new ConnectionListener() {
@Override @Override

View file

@ -5,5 +5,6 @@ public interface SetupListener {
void onHiddenServicePublished(); void onHiddenServicePublished();
@SuppressWarnings("unused")
void onSetupFailed(Throwable throwable); void onSetupFailed(Throwable throwable);
} }

View file

@ -2,14 +2,12 @@ package io.bitsquare.p2p.network.messages;
import io.bitsquare.app.Version; import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
public final class CloseConnectionMessage implements Message { public final class CloseConnectionMessage implements Message {
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private final int networkId = Version.getNetworkId(); private final int networkId = Version.getNetworkId();
public NodeAddress peerNodeAddress;
public CloseConnectionMessage() { public CloseConnectionMessage() {
} }

View file

@ -1,47 +0,0 @@
package io.bitsquare.p2p.peers;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
public class Peer {
private static final Logger log = LoggerFactory.getLogger(Peer.class);
public final Connection connection;
public final NodeAddress nodeAddress;
public final long pingNonce;
public Peer(Connection connection, NodeAddress nodeAddress) {
this.connection = connection;
this.nodeAddress = nodeAddress;
pingNonce = new Random().nextLong();
}
@Override
public int hashCode() {
return nodeAddress != null ? nodeAddress.hashCode() : 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Peer)) return false;
Peer peer = (Peer) o;
return !(nodeAddress != null ? !nodeAddress.equals(peer.nodeAddress) : peer.nodeAddress != null);
}
@Override
public String toString() {
return "Peer{" +
"address=" + nodeAddress +
", pingNonce=" + pingNonce +
", connection=" + connection +
'}';
}
}

View file

@ -174,8 +174,9 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) { private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer(); stopTimeoutTimer();
//peerManager.removePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) { if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " + log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again."); "We will try getReportedPeers again.");
@ -185,8 +186,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
"That is expected if no other node is online.\n" + "That is expected if no other node is online.\n" +
"We will try to use reported peers (if no available we use persisted peers) " + "We will try to use reported peers (if no available we use persisted peers) " +
"and try again to request peers from our seed nodes after a random pause."); "and try again to request peers from our seed nodes after a random pause.");
requestReportedPeersAfterDelayTimer = UserThread.runAfter(() -> requestReportedPeersAfterDelayTimer = UserThread.runAfter(this::continueWithMorePeers,
continueWithMorePeers(),
10, TimeUnit.SECONDS); 10, TimeUnit.SECONDS);
} }
} }
@ -201,18 +201,14 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Log.traceCall(); Log.traceCall();
if (!peerManager.hasSufficientConnections()) { if (!peerManager.hasSufficientConnections()) {
// We want to keep it sorted but avoid duplicates // We want to keep it sorted but avoid duplicates
List<NodeAddress> list = new ArrayList<>(peerManager.getNodeAddressesOfReportedPeers().stream() List<NodeAddress> list = new ArrayList<>(getFilteredAndSortedList(peerManager.getReportedPeers(), new ArrayList<>()));
.filter(e -> !networkNode.getNodeAddressesOfConfirmedConnections().contains(e)) list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
.collect(Collectors.toSet()));
list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream()
.filter(e -> !list.contains(e) &&
!networkNode.getNodeAddressesOfConfirmedConnections().contains(e))
.collect(Collectors.toSet()));
list.addAll(seedNodeAddresses.stream() list.addAll(seedNodeAddresses.stream()
.filter(e -> !list.contains(e) && .filter(e -> !list.contains(e) &&
!networkNode.getNodeAddressesOfConfirmedConnections().contains(e) && !peerManager.isSelf(e) &&
!e.equals(networkNode.getNodeAddress())) !peerManager.isConfirmed(e))
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
log.trace("Sorted and filtered list: list=" + list);
if (!list.isEmpty()) { if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0); NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate); list.remove(nextCandidate);
@ -225,6 +221,20 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
} }
// sorted by most recent lastActivityDate
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
return set.stream()
.filter(e -> !list.contains(e.nodeAddress) &&
!peerManager.isSeedNode(e) &&
!peerManager.isSelf(e) &&
!peerManager.isConfirmed(e))
.collect(Collectors.toList())
.stream()
.sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate))
.map(e -> e.nodeAddress)
.collect(Collectors.toList());
}
// we check if we have at least one seed node connected // we check if we have at least one seed node connected
private void checkForSeedNode() { private void checkForSeedNode() {
@ -243,7 +253,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) { private HashSet<ReportedPeer> getReportedPeersHashSet(NodeAddress receiverNodeAddress) {
return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream() return new HashSet<>(peerManager.getConnectedAndReportedPeers().stream()
.filter(e -> !peerManager.isSeedNode(e) && .filter(e -> !peerManager.isSeedNode(e) &&
!e.nodeAddress.equals(networkNode.getNodeAddress()) && !peerManager.isSelf(e) &&
!e.nodeAddress.equals(receiverNodeAddress) !e.nodeAddress.equals(receiverNodeAddress)
) )
.collect(Collectors.toSet())); .collect(Collectors.toSet()));

View file

@ -74,12 +74,12 @@ public class PeerManager implements ConnectionListener, MessageListener {
}; };
} }
protected void createDbStorage(File storageDir) { private void createDbStorage(File storageDir) {
dbStorage = new Storage<>(storageDir); dbStorage = new Storage<>(storageDir);
initPersistedPeers(); initPersistedPeers();
} }
protected void initPersistedPeers() { private void initPersistedPeers() {
if (dbStorage != null) { if (dbStorage != null) {
HashSet<ReportedPeer> persistedPeers = dbStorage.initAndGetPersisted("persistedPeers"); HashSet<ReportedPeer> persistedPeers = dbStorage.initAndGetPersisted("persistedPeers");
if (persistedPeers != null) { if (persistedPeers != null) {
@ -148,7 +148,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
// Check seed node connections // Check seed node connections
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
protected boolean checkMaxConnections(int limit) { private boolean checkMaxConnections(int limit) {
Log.traceCall(); Log.traceCall();
stopCheckMaxConnectionsTimer(); stopCheckMaxConnectionsTimer();
removeSuperfluousSeedNodes(); removeSuperfluousSeedNodes();
@ -205,7 +205,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
} }
} }
protected void removeSuperfluousSeedNodes() { private void removeSuperfluousSeedNodes() {
Set<Connection> allConnections = networkNode.getAllConnections(); Set<Connection> allConnections = networkNode.getAllConnections();
if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) { if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) {
List<Connection> candidates = allConnections.stream() List<Connection> candidates = allConnections.stream()
@ -228,11 +228,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
// Reported peers // Reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void removeReportedPeer(NodeAddress nodeAddress) { private void removeReportedPeer(ReportedPeer reportedPeer) {
removeReportedPeer(new ReportedPeer(nodeAddress));
}
public void removeReportedPeer(ReportedPeer reportedPeer) {
reportedPeers.remove(reportedPeer); reportedPeers.remove(reportedPeer);
printReportedPeers(); printReportedPeers();
} }
@ -241,13 +237,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
return reportedPeers; return reportedPeers;
} }
public Set<NodeAddress> getNodeAddressesOfReportedPeers() {
return reportedPeers.stream().map(e -> e.nodeAddress)
.filter(e -> !isSeedNode(e) &&
!e.equals(networkNode.getNodeAddress()))
.collect(Collectors.toSet());
}
public void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) { public void addToReportedPeers(HashSet<ReportedPeer> reportedPeersToAdd, Connection connection) {
Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd); Log.traceCall("reportedPeersToAdd = " + reportedPeersToAdd);
// we disconnect misbehaving nodes trying to send too many peers // we disconnect misbehaving nodes trying to send too many peers
@ -317,12 +306,22 @@ public class PeerManager implements ConnectionListener, MessageListener {
printReportedPeers(); printReportedPeers();
} }
private void printReportedPeers() {
if (!reportedPeers.isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Reported peers for node " + networkNode.getNodeAddress() + ":");
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Persisted peers // Persisted peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void removeFromPersistedPeers(ReportedPeer reportedPeer) { private void removeFromPersistedPeers(ReportedPeer reportedPeer) {
if (persistedPeers.contains(reportedPeer)) { if (persistedPeers.contains(reportedPeer)) {
persistedPeers.remove(reportedPeer); persistedPeers.remove(reportedPeer);
@ -331,21 +330,10 @@ public class PeerManager implements ConnectionListener, MessageListener {
} }
} }
public void removeFromPersistedPeers(NodeAddress peerNodeAddress) { public Set<ReportedPeer> getPersistedPeers() {
removeFromPersistedPeers(new ReportedPeer(peerNodeAddress));
}
public HashSet<ReportedPeer> getPersistedPeers() {
return persistedPeers; return persistedPeers;
} }
public Set<NodeAddress> getNodeAddressesOfPersistedPeers() {
return persistedPeers.stream().map(e -> e.nodeAddress)
.filter(e -> !isSeedNode(e) &&
!e.equals(networkNode.getNodeAddress()))
.collect(Collectors.toSet());
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Misc // Misc
@ -355,25 +343,12 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
} }
public void removePeer(NodeAddress nodeAddress) {
removeReportedPeer(nodeAddress);
removeFromPersistedPeers(nodeAddress);
}
public Set<ReportedPeer> getConnectedAndReportedPeers() { public Set<ReportedPeer> getConnectedAndReportedPeers() {
Set<ReportedPeer> result = new HashSet<>(reportedPeers); Set<ReportedPeer> result = new HashSet<>(reportedPeers);
result.addAll(getConnectedPeers()); result.addAll(getConnectedPeers());
return result; return result;
} }
public Set<ReportedPeer> getConnectedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream()
.map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate()))
.collect(Collectors.toSet());
}
public boolean isSeedNode(ReportedPeer reportedPeer) { public boolean isSeedNode(ReportedPeer reportedPeer) {
return seedNodeAddresses.contains(reportedPeer.nodeAddress); return seedNodeAddresses.contains(reportedPeer.nodeAddress);
} }
@ -386,6 +361,22 @@ public class PeerManager implements ConnectionListener, MessageListener {
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get()); return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
} }
public boolean isSelf(ReportedPeer reportedPeer) {
return isSelf(reportedPeer.nodeAddress);
}
public boolean isSelf(NodeAddress nodeAddress) {
return nodeAddress.equals(networkNode.getNodeAddress());
}
public boolean isConfirmed(ReportedPeer reportedPeer) {
return isConfirmed(reportedPeer.nodeAddress);
}
public boolean isConfirmed(NodeAddress nodeAddress) {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Private // Private
@ -415,6 +406,13 @@ public class PeerManager implements ConnectionListener, MessageListener {
return list.remove(new Random().nextInt(list.size())); return list.remove(new Random().nextInt(list.size()));
} }
private Set<ReportedPeer> getConnectedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())
return networkNode.getConfirmedConnections().stream()
.map(c -> new ReportedPeer(c.getPeersNodeAddressOptional().get(), c.getLastActivityDate()))
.collect(Collectors.toSet());
}
private void stopCheckMaxConnectionsTimer() { private void stopCheckMaxConnectionsTimer() {
if (checkMaxConnectionsTimer != null) { if (checkMaxConnectionsTimer != null) {
@ -433,13 +431,4 @@ public class PeerManager implements ConnectionListener, MessageListener {
} }
} }
private void printReportedPeers() {
if (!reportedPeers.isEmpty()) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Reported peers for node " + networkNode.getNodeAddress() + ":");
reportedPeers.stream().forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
}
} }

View file

@ -195,7 +195,6 @@ public class RequestDataManager implements MessageListener {
private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) { private void handleError(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses); Log.traceCall("nodeAddress=" + nodeAddress + " / remainingNodeAddresses=" + remainingNodeAddresses);
stopTimeoutTimer(); stopTimeoutTimer();
//peerManager.removePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) { if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " + log.info("There are remaining nodes available for requesting data. " +
@ -221,12 +220,9 @@ public class RequestDataManager implements MessageListener {
// we got from the other seed node contacted but we still have not requested the initial // we got from the other seed node contacted but we still have not requested the initial
// data set // data set
List<NodeAddress> list = new ArrayList<>(seedNodeAddresses); List<NodeAddress> list = new ArrayList<>(seedNodeAddresses);
list.addAll(peerManager.getNodeAddressesOfReportedPeers().stream() list.addAll(getFilteredAndSortedList(peerManager.getReportedPeers(), list));
.filter(e -> !list.contains(e)) list.addAll(getFilteredAndSortedList(peerManager.getPersistedPeers(), list));
.collect(Collectors.toSet())); log.trace("Sorted and filtered list: list=" + list);
list.addAll(peerManager.getNodeAddressesOfPersistedPeers().stream()
.filter(e -> !list.contains(e))
.collect(Collectors.toSet()));
if (!list.isEmpty()) { if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0); NodeAddress nextCandidate = list.get(0);
list.remove(nextCandidate); list.remove(nextCandidate);
@ -241,6 +237,19 @@ public class RequestDataManager implements MessageListener {
} }
} }
// sorted by most recent lastActivityDate
private List<NodeAddress> getFilteredAndSortedList(Set<ReportedPeer> set, List<NodeAddress> list) {
return set.stream()
.filter(e -> !list.contains(e.nodeAddress) &&
!peerManager.isSeedNode(e) &&
!peerManager.isSelf(e.nodeAddress))
.collect(Collectors.toList())
.stream()
.sorted((o1, o2) -> o2.lastActivityDate.compareTo(o1.lastActivityDate))
.map(e -> e.nodeAddress)
.collect(Collectors.toList());
}
private void stopRequestDataTimer() { private void stopRequestDataTimer() {
if (requestDataAfterDelayTimer != null) { if (requestDataAfterDelayTimer != null) {
requestDataAfterDelayTimer.cancel(); requestDataAfterDelayTimer.cancel();

View file

@ -11,7 +11,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
// That object is sent over the wire, so we need to take care of version compatibility. // That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
private NodeAddress senderNodeAddress; private final NodeAddress senderNodeAddress;
public final HashSet<ReportedPeer> reportedPeers; public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(NodeAddress senderNodeAddress, HashSet<ReportedPeer> reportedPeers) { public GetPeersRequest(NodeAddress senderNodeAddress, HashSet<ReportedPeer> reportedPeers) {

View file

@ -145,7 +145,7 @@ public class SeedNode {
return seedNodeP2PService; return seedNodeP2PService;
} }
public void shutDown() { private void shutDown() {
Log.traceCall(); Log.traceCall();
shutDown(null); shutDown(null);
} }

View file

@ -45,7 +45,7 @@ public class P2PDataStorage implements MessageListener {
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>(); private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage; private final Storage<HashMap> storage;
protected final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor; private final ScheduledThreadPoolExecutor removeExpiredEntriesExecutor;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor

View file

@ -3,31 +3,12 @@ package io.bitsquare.p2p.storage.data;
import java.io.Serializable; import java.io.Serializable;
public class DataAndSeqNr implements Serializable { public class DataAndSeqNr implements Serializable {
public final Serializable data; // data are only used for getting cryptographic hash from both values
public final int sequenceNumber; private final Serializable data;
private final int sequenceNumber;
public DataAndSeqNr(Serializable data, int sequenceNumber) { public DataAndSeqNr(Serializable data, int sequenceNumber) {
this.data = data; this.data = data;
this.sequenceNumber = sequenceNumber; this.sequenceNumber = sequenceNumber;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DataAndSeqNr)) return false;
DataAndSeqNr that = (DataAndSeqNr) o;
//noinspection SimplifiableIfStatement
if (sequenceNumber != that.sequenceNumber) return false;
return !(data != null ? !data.equals(that.data) : that.data != null);
}
@Override
public int hashCode() {
int result = data != null ? data.hashCode() : 0;
result = 31 * result + sequenceNumber;
return result;
}
} }