Dont store disconnected nodes with wrong version/invalid data...

This commit is contained in:
Manfred Karrer 2016-02-04 19:00:16 +01:00
parent a264fa4e0b
commit 2c8bc3053b
8 changed files with 99 additions and 68 deletions

View file

@ -39,9 +39,10 @@ public class Version {
public static int getP2PMessageVersion() { public static int getP2PMessageVersion() {
// A changed NETWORK_PROTOCOL_VERSION for the serialized objects does not trigger reliable a disconnect. // TODO investigate why a changed NETWORK_PROTOCOL_VERSION for the serialized objects does not trigger
// TODO investigate why, but java serialisation should be replaced anyway, so using one existing field // reliable a disconnect., but java serialisation should be replaced anyway, so using one existing field
// for the version is fine. // for the version is fine.
// BTC_NETWORK_ID is 0, 1 or 2, we use for changes at NETWORK_PROTOCOL_VERSION a multiplication with 10 // BTC_NETWORK_ID is 0, 1 or 2, we use for changes at NETWORK_PROTOCOL_VERSION a multiplication with 10
// to avoid conflicts: // to avoid conflicts:
// E.g. btc BTC_NETWORK_ID=1, NETWORK_PROTOCOL_VERSION=1 -> getNetworkId()=2; // E.g. btc BTC_NETWORK_ID=1, NETWORK_PROTOCOL_VERSION=1 -> getNetworkId()=2;

View file

@ -198,7 +198,7 @@ public class WalletService {
// TODO Get bitcoinj running over our tor proxy. BlockingClientManager need to be used to use the socket // TODO Get bitcoinj running over our tor proxy. BlockingClientManager need to be used to use the socket
// from jtorproxy. To get supported it via nio / netty will be harder // from jtorproxy. To get supported it via nio / netty will be harder
if (!params.getId().equals(NetworkParameters.ID_REGTEST) && useTor) if (useTor && params.getId().equals(NetworkParameters.ID_MAINNET))
walletAppKit.useTor(); walletAppKit.useTor();
// Now configure and start the appkit. This will take a second or two - we could show a temporary splash screen // Now configure and start the appkit. This will take a second or two - we could show a temporary splash screen

View file

@ -660,9 +660,12 @@ public class Connection implements MessageListener {
messageListener.onMessage(message, connection); messageListener.onMessage(message, connection);
} }
} catch (IOException | ClassNotFoundException | NoClassDefFoundError e) { } catch (ClassNotFoundException | NoClassDefFoundError e) {
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE); reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
return; return;
} catch (IOException e) {
stop();
sharedModel.handleConnectionException(e);
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();
stop(); stop();

View file

@ -17,6 +17,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.HashSet; import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.Timer; import java.util.Timer;
@ -36,7 +37,7 @@ public class PeerExchangeHandshake implements MessageListener {
public interface Listener { public interface Listener {
void onComplete(); void onComplete();
void onFault(String errorMessage); void onFault(String errorMessage, @Nullable Connection connection);
} }
@ -94,7 +95,7 @@ public class PeerExchangeHandshake implements MessageListener {
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, null);
} }
}); });
@ -107,7 +108,7 @@ public class PeerExchangeHandshake implements MessageListener {
log.info("timeoutTimer called on " + this); log.info("timeoutTimer called on " + this);
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, null);
}, },
20, TimeUnit.SECONDS); 20, TimeUnit.SECONDS);
} }
@ -145,7 +146,7 @@ public class PeerExchangeHandshake implements MessageListener {
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, connection);
} }
}); });
@ -158,7 +159,7 @@ public class PeerExchangeHandshake implements MessageListener {
log.info("timeoutTimer called. this=" + this); log.info("timeoutTimer called. this=" + this);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, connection);
}, },
20, TimeUnit.SECONDS); 20, TimeUnit.SECONDS);

View file

@ -8,6 +8,7 @@ import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest; import io.bitsquare.p2p.peers.messages.peers.GetPeersRequest;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -115,10 +116,10 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
@Override @Override
public void onFault(String errorMessage) { public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"connection={}", errorMessage, connection); "connection={}", errorMessage, connection);
peerManager.penalizeUnreachablePeer(connection); peerManager.handleConnectionFault(connection);
} }
}); });
peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection); peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection);
@ -144,12 +145,12 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
} }
@Override @Override
public void onFault(String errorMessage) { public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" + log.trace("PeerExchangeHandshake of outbound connection failed.\n\terrorMessage={}\n\t" +
"nodeAddress={}", errorMessage, nodeAddress); "nodeAddress={}", errorMessage, nodeAddress);
peerExchangeHandshakeMap.remove(nodeAddress); peerExchangeHandshakeMap.remove(nodeAddress);
peerManager.penalizeUnreachablePeer(nodeAddress); peerManager.handleConnectionFault(nodeAddress, connection);
if (!shutDownInProgress) { if (!shutDownInProgress) {
if (!remainingNodeAddresses.isEmpty()) { if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " + log.info("There are remaining nodes available for requesting peers. " +

View file

@ -11,6 +11,7 @@ import javafx.beans.value.ChangeListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -80,6 +81,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
if (checkMaxConnectionsTimer == null && newValue != null) if (checkMaxConnectionsTimer == null && newValue != null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> { checkMaxConnectionsTimer = UserThread.runAfter(() -> {
removeTooOldReportedPeers(); removeTooOldReportedPeers();
removeTooOldPersistedPeers();
checkMaxConnections(MAX_CONNECTIONS); checkMaxConnections(MAX_CONNECTIONS);
}, 3); }, 3);
}; };
@ -114,17 +116,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
@Override @Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener); connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> { handleConnectionFault(connection);
penalizeUnreachablePeer(nodeAddress);
Optional<ReportedPeer> reportedPeerOptional = reportedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) {
ReportedPeer reportedPeer = reportedPeerOptional.get();
reportedPeers.remove(reportedPeer);
persistedPeers.add(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000);
}
});
} }
@Override @Override
@ -224,21 +216,6 @@ public class PeerManager implements ConnectionListener, MessageListener {
} }
} }
private void removeTooOldReportedPeers() {
Log.traceCall();
Set<ReportedPeer> reportedPeersToRemove = reportedPeers.stream()
.filter(reportedPeer -> reportedPeer.lastActivityDate != null &&
new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
reportedPeersToRemove.forEach(this::removeReportedPeer);
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream()
.filter(reportedPeer -> reportedPeer.lastActivityDate != null &&
new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removeFromPersistedPeers);
}
private void removeSuperfluousSeedNodes() { private void removeSuperfluousSeedNodes() {
Log.traceCall(); Log.traceCall();
Set<Connection> allConnections = networkNode.getAllConnections(); Set<Connection> allConnections = networkNode.getAllConnections();
@ -262,9 +239,31 @@ public class PeerManager implements ConnectionListener, MessageListener {
// Reported peers // Reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void removeReportedPeer(ReportedPeer reportedPeer) { private boolean removeReportedPeer(ReportedPeer reportedPeer) {
reportedPeers.remove(reportedPeer); boolean contained = reportedPeers.remove(reportedPeer);
printReportedPeers(); printReportedPeers();
return contained;
}
private ReportedPeer removeReportedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> reportedPeerOptional = reportedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) {
ReportedPeer reportedPeer = reportedPeerOptional.get();
reportedPeers.remove(reportedPeer);
return reportedPeer;
} else {
return null;
}
}
private void removeTooOldReportedPeers() {
Log.traceCall();
Set<ReportedPeer> reportedPeersToRemove = reportedPeers.stream()
.filter(reportedPeer -> reportedPeer.lastActivityDate != null &&
new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
reportedPeersToRemove.forEach(this::removeReportedPeer);
} }
public Set<ReportedPeer> getReportedPeers() { public Set<ReportedPeer> getReportedPeers() {
@ -358,15 +357,40 @@ public class PeerManager implements ConnectionListener, MessageListener {
// Persisted peers // Persisted peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void removeFromPersistedPeers(ReportedPeer reportedPeer) { private boolean removePersistedPeer(ReportedPeer reportedPeer) {
if (persistedPeers.contains(reportedPeer)) { if (persistedPeers.contains(reportedPeer)) {
persistedPeers.remove(reportedPeer); persistedPeers.remove(reportedPeer);
if (dbStorage != null) if (dbStorage != null)
dbStorage.queueUpForSave(persistedPeers, 5000); dbStorage.queueUpForSave(persistedPeers, 5000);
return true;
} else {
return false;
} }
} }
private boolean removePersistedPeer(NodeAddress nodeAddress) {
Optional<ReportedPeer> persistedPeerOptional = persistedPeers.stream()
.filter(e -> e.nodeAddress.equals(nodeAddress)).findAny();
persistedPeerOptional.ifPresent(persistedPeer -> {
persistedPeers.remove(persistedPeer);
if (dbStorage != null)
dbStorage.queueUpForSave(persistedPeers, 5000);
});
return persistedPeerOptional.isPresent();
}
private void removeTooOldPersistedPeers() {
Log.traceCall();
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream()
.filter(reportedPeer -> reportedPeer.lastActivityDate != null &&
new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removePersistedPeer);
}
public Set<ReportedPeer> getPersistedPeers() { public Set<ReportedPeer> getPersistedPeers() {
return persistedPeers; return persistedPeers;
} }
@ -380,25 +404,25 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS; return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
} }
public void penalizeUnreachablePeer(Connection connection) { public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(this::penalizeUnreachablePeer); connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
} }
public void penalizeUnreachablePeer(NodeAddress nodeAddress) { public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
Log.traceCall("nodeAddress=" + nodeAddress); Log.traceCall("nodeAddress=" + nodeAddress);
reportedPeers.stream() ReportedPeer reportedPeer = removeReportedPeer(nodeAddress);
.filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress)) if (connection != null && connection.getRuleViolation() != null) {
.findAny() removePersistedPeer(nodeAddress);
.ifPresent(ReportedPeer::penalizeLastActivityDate); } else {
persistedPeers.stream() if (reportedPeer != null) {
.filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress)) removePersistedPeer(nodeAddress);
.findAny()
.ifPresent(reportedPeer -> {
reportedPeer.penalizeLastActivityDate(); reportedPeer.penalizeLastActivityDate();
persistedPeers.add(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000); dbStorage.queueUpForSave(persistedPeers, 5000);
});
removeTooOldReportedPeers(); removeTooOldPersistedPeers();
}
}
} }
public Set<ReportedPeer> getConnectedAndReportedPeers() { public Set<ReportedPeer> getConnectedAndReportedPeers() {
@ -465,7 +489,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
for (int i = 0; i < diff; i++) { for (int i = 0; i < diff; i++) {
ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list); ReportedPeer toRemove = getAndRemoveRandomReportedPeer(list);
removeReportedPeer(toRemove); removeReportedPeer(toRemove);
removeFromPersistedPeers(toRemove); removePersistedPeer(toRemove);
} }
} else { } else {
log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS); log.trace("No need to purge reported peers.\n\tWe don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);

View file

@ -38,7 +38,7 @@ public class RequestDataHandshake implements MessageListener {
public interface Listener { public interface Listener {
void onComplete(); void onComplete();
void onFault(String errorMessage); void onFault(String errorMessage, @Nullable Connection connection);
} }
@ -105,7 +105,7 @@ public class RequestDataHandshake implements MessageListener {
log.info(errorMessage); log.info(errorMessage);
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, null);
} }
}); });
@ -117,7 +117,7 @@ public class RequestDataHandshake implements MessageListener {
RequestDataHandshake.this); RequestDataHandshake.this);
peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT); peerManager.shutDownConnection(nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, null);
}, },
10, TimeUnit.SECONDS); 10, TimeUnit.SECONDS);
} }
@ -146,7 +146,7 @@ public class RequestDataHandshake implements MessageListener {
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_FAILURE);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, connection);
} }
}); });
@ -158,7 +158,7 @@ public class RequestDataHandshake implements MessageListener {
RequestDataHandshake.this); RequestDataHandshake.this);
peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT); peerManager.shutDownConnection(connection, CloseConnectionReason.SEND_MSG_TIMEOUT);
shutDown(); shutDown();
listener.onFault(errorMessage); listener.onFault(errorMessage, connection);
}, },
10, TimeUnit.SECONDS); 10, TimeUnit.SECONDS);
} }

View file

@ -9,6 +9,7 @@ import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.messages.data.GetDataRequest; import io.bitsquare.p2p.peers.messages.data.GetDataRequest;
import io.bitsquare.p2p.storage.P2PDataStorage; import io.bitsquare.p2p.storage.P2PDataStorage;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -126,10 +127,10 @@ public class RequestDataManager implements MessageListener {
} }
@Override @Override
public void onFault(String errorMessage) { public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of inbound connection failed.\n\tConnection={}\n\t" + log.trace("requestDataHandshake of inbound connection failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage); "ErrorMessage={}", connection, errorMessage);
peerManager.penalizeUnreachablePeer(connection); peerManager.handleConnectionFault(connection);
} }
}); });
requestDataHandshake.onDataRequest(message, connection); requestDataHandshake.onDataRequest(message, connection);
@ -171,11 +172,11 @@ public class RequestDataManager implements MessageListener {
} }
@Override @Override
public void onFault(String errorMessage) { public void onFault(String errorMessage, @Nullable Connection connection) {
log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" + log.trace("requestDataHandshake of outbound connection failed.\n\tnodeAddress={}\n\t" +
"ErrorMessage={}", nodeAddress, errorMessage); "ErrorMessage={}", nodeAddress, errorMessage);
peerManager.penalizeUnreachablePeer(nodeAddress); peerManager.handleConnectionFault(nodeAddress, connection);
if (!shutDownInProgress) { if (!shutDownInProgress) {
if (!remainingNodeAddresses.isEmpty()) { if (!remainingNodeAddresses.isEmpty()) {