From 6e976cc351c8bf1040b985457aaad067be0c0efa Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Mon, 11 Apr 2016 13:28:32 +0200 Subject: [PATCH] Refactor checks for incoming data --- .../io/bitsquare/p2p/network/Connection.java | 87 ++++++++++++------- 1 file changed, 55 insertions(+), 32 deletions(-) diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 4f946b20ff..56609431ad 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -620,83 +620,105 @@ public class Connection implements MessageListener { try { Thread.currentThread().setName("InputHandler-" + portInfo); while (!stopped && !Thread.currentThread().isInterrupted()) { + Connection connection = sharedModel.connection; + log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + connection); try { - log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + sharedModel.connection); Object rawInputObject = objectInputStream.readObject(); // Throttle inbound messages - if (System.currentTimeMillis() - lastReadTimeStamp < 10) { + long now = System.currentTimeMillis(); + long elapsed = now - lastReadTimeStamp; + if (elapsed < 10) { log.info("We got 2 messages received in less then 10 ms. We set the thread to sleep " + - "for 20 ms to avoid that we get flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}", - lastReadTimeStamp, System.currentTimeMillis(), (System.currentTimeMillis() - lastReadTimeStamp)); + "for 20 ms to avoid that we get flooded from our peer. lastReadTimeStamp={}, now={}, elapsed={}", + lastReadTimeStamp, now, elapsed); Thread.sleep(20); } - lastReadTimeStamp = System.currentTimeMillis(); - + lastReadTimeStamp = now; int size = ByteArrayUtils.objectToByteArray(rawInputObject).length; - if (rawInputObject instanceof Message) { - Message message = (Message) rawInputObject; - Connection connection = sharedModel.connection; - connection.statistic.addReceivedBytes(size); - connection.statistic.addReceivedMessage(message); - } + if (rawInputObject instanceof Pong || rawInputObject instanceof RefreshTTLMessage) { - // pongs and offer refresh msg we dont want to log in production + // We only log Pong and RefreshTTLMessage when in dev environment (trace) log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + "New data arrived at inputHandler of connection {}.\n" + "Received object (truncated)={} / size={}" + "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", - sharedModel.connection, + connection, StringUtils.abbreviate(rawInputObject.toString(), 100), size); - } else { + } else if (rawInputObject instanceof Message) { + // We want to log all incoming messages (except Pong and RefreshTTLMessage) + // so we log before the data type checks log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + "New data arrived at inputHandler of connection {}.\n" + "Received object (truncated)={} / size={}" + "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", - sharedModel.connection, + connection, StringUtils.abbreviate(rawInputObject.toString(), 100), size); + } else { + log.error("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" + + "Invalid data arrived at inputHandler of connection {}.\n" + + "Size={}" + + "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", + connection, + size); } - if (size > getMaxMsgSize()) { - if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED)) - return; + // We want to track the size of each object even if it is invalid data + connection.statistic.addReceivedBytes(size); + + // We want to track the messages also before the checks, so do it early... + Message message = null; + if (rawInputObject instanceof Message) { + message = (Message) rawInputObject; + connection.statistic.addReceivedMessage((Message) rawInputObject); } + + // First we check the size + if (size > getMaxMsgSize() && reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED)) + return; + + // Then we check if data is of type Serializable (objectInputStream supports + // Externalizable objects as well) Serializable serializable; if (rawInputObject instanceof Serializable) { serializable = (Serializable) rawInputObject; } else { reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE); + // We return anyway here independent of the return value of reportInvalidRequest return; } - if (sharedModel.connection.violatesThrottleLimit(serializable)) { - if (reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED)) - return; - } + // Then check data throttle limit. Do that for non-message type objects as well, + // so that's why we use serializable here. + if (connection.violatesThrottleLimit(serializable) && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED)) + return; - if (!(serializable instanceof Message)) { + // We do the message type check after the size/throttle checks. + // The type check was done already earlier so we only check if message is not null. + if (message == null) { reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE); + // We return anyway here independent of the return value of reportInvalidRequest return; } - Message message = (Message) serializable; - Connection connection = sharedModel.connection; - connection.statistic.addReceivedBytes(size); - connection.statistic.addReceivedMessage(message); - - if (message.getMessageVersion() != Version.getP2PMessageVersion()) { - log.warn("message.getMessageVersion()=" + message.getMessageVersion()); + // Check P2P network ID + int messageVersion = message.getMessageVersion(); + int p2PMessageVersion = Version.getP2PMessageVersion(); + if (messageVersion != p2PMessageVersion) { + log.warn("message.getMessageVersion()=" + messageVersion); + log.warn("Version.getP2PMessageVersion()=" + p2PMessageVersion); log.warn("message=" + message); - log.warn("Version.getP2PMessageVersion()=" + Version.getP2PMessageVersion()); reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID); + // We return anyway here independent of the return value of reportInvalidRequest return; } if (message instanceof CloseConnectionMessage) { + // If we get a CloseConnectionMessage we shut down log.info("CloseConnectionMessage received. Reason={}\n\t" + "connection={}", ((CloseConnectionMessage) message).reason, connection); stop(); @@ -723,6 +745,7 @@ public class Connection implements MessageListener { NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress(); Optional peersNodeAddressOptional = connection.getPeersNodeAddressOptional(); if (peersNodeAddressOptional.isPresent()) { + // If we have already the peers address we check again if it matches our stored one checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress), "senderNodeAddress not matching connections peer address.\n\t" + "message=" + message);