diff --git a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java index cc16132fdd..013c2ec40f 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/Broadcaster.java @@ -39,13 +39,13 @@ public class Broadcaster { receivers.stream() .filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender)) .forEach(connection -> { - log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " + + log.trace("Broadcast message to " + connection.getPeersNodeAddressOptional().get() + "."); SettableFuture future = networkNode.sendMessage(connection, message); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Connection connection) { - log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + connection + " succeeded."); + log.trace("Broadcast to " + connection + " succeeded."); listeners.stream().forEach(listener -> { listener.onBroadcasted(message); listeners.remove(listener); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java index c31bb60d63..718bd0eba6 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerExchangeManager.java @@ -116,6 +116,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener public void onFault(String errorMessage) { log.trace("PeerExchangeHandshake of outbound connection failed. {} connection= {}", errorMessage, connection); + peerManager.penalizeUnreachablePeer(connection); } }); peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection); @@ -147,6 +148,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener errorMessage, nodeAddress); peerExchangeHandshakeMap.remove(nodeAddress); + peerManager.penalizeUnreachablePeer(nodeAddress); if (!remainingNodeAddresses.isEmpty()) { log.info("There are remaining nodes available for requesting peers. " + "We will try getReportedPeers again."); @@ -186,7 +188,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener Collections.shuffle(nodeAddresses); requestReportedPeersFromRandomPeer(nodeAddresses); } - + // We try to get sufficient connections by connecting to reported and persisted peers if (numberOfConnectedSeedNodes == 0) { @@ -197,16 +199,16 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener connectToMorePeers(); } - // Use all outbound connections older than 4 min. for updating reported peers and make sure we keep the connection alive + // Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive // Inbound connections should be maintained be the requesting peer confirmedConnections.stream() .filter(c -> c.getPeersNodeAddressOptional().isPresent() && c instanceof OutboundConnection && - new Date().getTime() - c.getLastActivityDate().getTime() > 4 * 60 * 1000) - .forEach(c -> UserThread.runAfterRandomDelay(() -> { + new Date().getTime() - c.getLastActivityDate().getTime() > 10 * 60 * 1000) + .forEach(c -> { log.trace("Call requestReportedPeers from maintainConnections"); requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>()); - }, 3, 5)); + }); } private void connectToMorePeers() { @@ -228,6 +230,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener !peerManager.isSelf(e) && !peerManager.isConfirmed(e)) .collect(Collectors.toSet())); + log.info("Sorted and filtered list: list.size()=" + list.size()); log.trace("Sorted and filtered list: list=" + list); if (!list.isEmpty()) { NodeAddress nextCandidate = list.get(0); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index eb3f22d26f..cee4ec14dd 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -44,6 +44,7 @@ public class PeerManager implements ConnectionListener, MessageListener { private static final int MAX_REPORTED_PEERS = 1000; private static final int MAX_PERSISTED_PEERS = 500; + private static final long MAX_AGE = 14 * 24 * 60 * 60 * 1000; // max age for reported peers is 14 days private final NetworkNode networkNode; @@ -68,9 +69,13 @@ public class PeerManager implements ConnectionListener, MessageListener { createDbStorage(storageDir); connectionNodeAddressListener = (observable, oldValue, newValue) -> { + // Every time we get a new peer connected with a known address we check if we need to remove peers printConnectedPeers(); if (checkMaxConnectionsTimer == null && newValue != null) - checkMaxConnectionsTimer = UserThread.runAfter(() -> checkMaxConnections(MAX_CONNECTIONS), 3); + checkMaxConnectionsTimer = UserThread.runAfter(() -> { + removeTooOldReportedPeers(); + checkMaxConnections(MAX_CONNECTIONS); + }, 3); }; } @@ -118,7 +123,12 @@ public class PeerManager implements ConnectionListener, MessageListener { @Override public void onDisconnect(Reason reason, Connection connection) { connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); - //connection.getPeersNodeAddressOptional().ifPresent(this::removePeer); + connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> { + ReportedPeer reportedPeer = new ReportedPeer(nodeAddress); + reportedPeers.remove(reportedPeer); + persistedPeers.add(reportedPeer); + dbStorage.queueUpForSave(persistedPeers, 5000); + }); } @Override @@ -203,6 +213,18 @@ public class PeerManager implements ConnectionListener, MessageListener { } } + private void removeTooOldReportedPeers() { + Set reportedPeersToRemove = reportedPeers.stream() + .filter(reportedPeer -> new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE) + .collect(Collectors.toSet()); + reportedPeersToRemove.forEach(this::removeReportedPeer); + + Set persistedPeersToRemove = persistedPeers.stream() + .filter(reportedPeer -> new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE) + .collect(Collectors.toSet()); + persistedPeersToRemove.forEach(this::removeFromPersistedPeers); + } + private void removeSuperfluousSeedNodes() { Set allConnections = networkNode.getAllConnections(); if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) { @@ -299,7 +321,7 @@ public class PeerManager implements ConnectionListener, MessageListener { } if (dbStorage != null) - dbStorage.queueUpForSave(persistedPeers); + dbStorage.queueUpForSave(persistedPeers, 2000); } printReportedPeers(); @@ -342,6 +364,31 @@ public class PeerManager implements ConnectionListener, MessageListener { return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; } + public void penalizeUnreachablePeer(Connection connection) { + connection.getPeersNodeAddressOptional().ifPresent(this::penalizeUnreachablePeer); + } + + public void penalizeUnreachablePeer(NodeAddress nodeAddress) { + reportedPeers.stream() + .filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress)) + .findAny() + .ifPresent(this::adjustLastActivityDate); + persistedPeers.stream() + .filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress)) + .findAny() + .ifPresent(reportedPeer -> { + adjustLastActivityDate(reportedPeer); + dbStorage.queueUpForSave(persistedPeers, 5000); + }); + } + + private void adjustLastActivityDate(ReportedPeer reportedPeer) { + long now = new Date().getTime(); + long diff = now - reportedPeer.lastActivityDate.getTime(); + long reduced = now - diff * 2; + reportedPeer.setLastActivityDate(new Date(reduced)); + } + public Set getConnectedAndReportedPeers() { Set result = new HashSet<>(reportedPeers); result.addAll(getConnectedPeers()); @@ -443,5 +490,4 @@ public class PeerManager implements ConnectionListener, MessageListener { log.info(result.toString()); } } - } diff --git a/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java b/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java index 529dbd9055..b5af6c9b4d 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/ReportedPeer.java @@ -11,7 +11,7 @@ public class ReportedPeer implements Serializable { private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION; public final NodeAddress nodeAddress; - public final Date lastActivityDate; + public Date lastActivityDate; public ReportedPeer(NodeAddress nodeAddress, Date lastActivityDate) { this.nodeAddress = nodeAddress; @@ -22,6 +22,10 @@ public class ReportedPeer implements Serializable { this(nodeAddress, null); } + public void setLastActivityDate(Date lastActivityDate) { + this.lastActivityDate = lastActivityDate; + } + // We don't use the lastActivityDate for identity @Override public boolean equals(Object o) { diff --git a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java index 6eac9cb2a8..54c6177ee7 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/RequestDataManager.java @@ -126,6 +126,7 @@ public class RequestDataManager implements MessageListener { public void onFault(String errorMessage) { log.trace("RequestDataHandshake of inbound connection failed. {} Connection= {}", errorMessage, connection); + peerManager.penalizeUnreachablePeer(connection); } }); requestDataHandshake.onDataRequest(message, connection); @@ -170,6 +171,9 @@ public class RequestDataManager implements MessageListener { public void onFault(String errorMessage) { log.trace("RequestDataHandshake of outbound connection failed. {} nodeAddress= {}", errorMessage, nodeAddress); + + peerManager.penalizeUnreachablePeer(nodeAddress); + if (!remainingNodeAddresses.isEmpty()) { log.info("There are remaining nodes available for requesting data. " + "We will try requestDataFromPeers again."); diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index ab3b4be105..0bacc97c07 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -160,14 +160,11 @@ public class P2PDataStorage implements MessageListener { storage.queueUpForSave(sequenceNumberMap, 5000); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); - sb.append("Data set after addProtectedExpirableData:"); - if (map.values().size() < 10) - map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n")); - else - map.values().stream().forEach(e -> sb.append("\n").append("Truncated logs:").append(map.values().size()) - .append(" entries\n").append(e.toString().substring(0, 40)).append("...\n")); + sb.append("Data set after addProtectedExpirableData (truncated)"); + map.values().stream().forEach(e -> sb.append("\n").append(e.toString().substring(0, 40)).append("...\n")); sb.append("\n------------------------------------------------------------\n"); - log.info(sb.toString()); + log.trace(sb.toString()); + log.info("Data set after addProtectedExpirableData: size=" + map.values().size()); if (rePublish || !containsKey) broadcast(new AddDataMessage(protectedData), sender); @@ -282,10 +279,12 @@ public class P2PDataStorage implements MessageListener { hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" + - "Data set after removeProtectedExpirableData:"); - map.values().stream().forEach(e -> sb.append("\n").append(e.toString())); + "Data set after removeProtectedExpirableData: (truncated)"); + map.values().stream().forEach(e -> sb.append("\n").append(e.toString().substring(0, 40)).append("...\n")); sb.append("\n------------------------------------------------------------\n"); - log.info(sb.toString()); + log.trace(sb.toString()); + log.info("Data set after addProtectedExpirableData: size=" + map.values().size()); + } private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {