Refactor checks for incoming data

This commit is contained in:
Manfred Karrer 2016-04-11 13:28:32 +02:00
parent f6d1a5b506
commit 6e976cc351

View File

@ -620,83 +620,105 @@ public class Connection implements MessageListener {
try {
Thread.currentThread().setName("InputHandler-" + portInfo);
while (!stopped && !Thread.currentThread().isInterrupted()) {
Connection connection = sharedModel.connection;
log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + connection);
try {
log.trace("InputHandler waiting for incoming messages.\n\tConnection=" + sharedModel.connection);
Object rawInputObject = objectInputStream.readObject();
// Throttle inbound messages
if (System.currentTimeMillis() - lastReadTimeStamp < 10) {
long now = System.currentTimeMillis();
long elapsed = now - lastReadTimeStamp;
if (elapsed < 10) {
log.info("We got 2 messages received in less then 10 ms. We set the thread to sleep " +
"for 20 ms to avoid that we get flooded by our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, System.currentTimeMillis(), (System.currentTimeMillis() - lastReadTimeStamp));
"for 20 ms to avoid that we get flooded from our peer. lastReadTimeStamp={}, now={}, elapsed={}",
lastReadTimeStamp, now, elapsed);
Thread.sleep(20);
}
lastReadTimeStamp = System.currentTimeMillis();
lastReadTimeStamp = now;
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (rawInputObject instanceof Message) {
Message message = (Message) rawInputObject;
Connection connection = sharedModel.connection;
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
}
if (rawInputObject instanceof Pong || rawInputObject instanceof RefreshTTLMessage) {
// pongs and offer refresh msg we dont want to log in production
// We only log Pong and RefreshTTLMessage when in dev environment (trace)
log.trace("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
sharedModel.connection,
connection,
StringUtils.abbreviate(rawInputObject.toString(), 100),
size);
} else {
} else if (rawInputObject instanceof Message) {
// We want to log all incoming messages (except Pong and RefreshTTLMessage)
// so we log before the data type checks
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler of connection {}.\n" +
"Received object (truncated)={} / size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
sharedModel.connection,
connection,
StringUtils.abbreviate(rawInputObject.toString(), 100),
size);
} else {
log.error("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"Invalid data arrived at inputHandler of connection {}.\n" +
"Size={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
connection,
size);
}
if (size > getMaxMsgSize()) {
if (reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
// We want to track the size of each object even if it is invalid data
connection.statistic.addReceivedBytes(size);
// We want to track the messages also before the checks, so do it early...
Message message = null;
if (rawInputObject instanceof Message) {
message = (Message) rawInputObject;
connection.statistic.addReceivedMessage((Message) rawInputObject);
}
// First we check the size
if (size > getMaxMsgSize() && reportInvalidRequest(RuleViolation.MAX_MSG_SIZE_EXCEEDED))
return;
// Then we check if data is of type Serializable (objectInputStream supports
// Externalizable objects as well)
Serializable serializable;
if (rawInputObject instanceof Serializable) {
serializable = (Serializable) rawInputObject;
} else {
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
// We return anyway here independent of the return value of reportInvalidRequest
return;
}
if (sharedModel.connection.violatesThrottleLimit(serializable)) {
if (reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
return;
}
// Then check data throttle limit. Do that for non-message type objects as well,
// so that's why we use serializable here.
if (connection.violatesThrottleLimit(serializable) && reportInvalidRequest(RuleViolation.THROTTLE_LIMIT_EXCEEDED))
return;
if (!(serializable instanceof Message)) {
// We do the message type check after the size/throttle checks.
// The type check was done already earlier so we only check if message is not null.
if (message == null) {
reportInvalidRequest(RuleViolation.INVALID_DATA_TYPE);
// We return anyway here independent of the return value of reportInvalidRequest
return;
}
Message message = (Message) serializable;
Connection connection = sharedModel.connection;
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
if (message.getMessageVersion() != Version.getP2PMessageVersion()) {
log.warn("message.getMessageVersion()=" + message.getMessageVersion());
// Check P2P network ID
int messageVersion = message.getMessageVersion();
int p2PMessageVersion = Version.getP2PMessageVersion();
if (messageVersion != p2PMessageVersion) {
log.warn("message.getMessageVersion()=" + messageVersion);
log.warn("Version.getP2PMessageVersion()=" + p2PMessageVersion);
log.warn("message=" + message);
log.warn("Version.getP2PMessageVersion()=" + Version.getP2PMessageVersion());
reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID);
// We return anyway here independent of the return value of reportInvalidRequest
return;
}
if (message instanceof CloseConnectionMessage) {
// If we get a CloseConnectionMessage we shut down
log.info("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", ((CloseConnectionMessage) message).reason, connection);
stop();
@ -723,6 +745,7 @@ public class Connection implements MessageListener {
NodeAddress senderNodeAddress = ((SendersNodeAddressMessage) message).getSenderNodeAddress();
Optional<NodeAddress> peersNodeAddressOptional = connection.getPeersNodeAddressOptional();
if (peersNodeAddressOptional.isPresent()) {
// If we have already the peers address we check again if it matches our stored one
checkArgument(peersNodeAddressOptional.get().equals(senderNodeAddress),
"senderNodeAddress not matching connections peer address.\n\t" +
"message=" + message);