add msg/sec throttle

This commit is contained in:
Manfred Karrer 2016-01-30 23:23:21 +01:00
parent 067bf07a83
commit 26270b209a

View File

@ -24,6 +24,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
@ -37,7 +38,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class Connection implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Connection.class);
private static final int MAX_MSG_SIZE = 5 * 1024 * 1024; // 5 MB of compressed data
private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data
private static final int MAX_MSG_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec
private static final int MAX_MSG_PER_10SEC = 50; // With MAX_MSG_SIZE of 100kb results in bandwidth of 5 mbit/sec for 10 sec
//timeout on blocking Socket operations like ServerSocket.accept() or SocketInputStream.read()
private static final int SOCKET_TIMEOUT = 30 * 60 * 1000; // 30 min.
@ -157,14 +160,14 @@ public class Connection implements MessageListener {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, message);
peersNodeAddress, uid, message.toString().substring(0, Math.min(60, message.toString().length())));
} else {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Write object to outputStream to peer: {} (uid={})\nmessage={}" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={}" +
"\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n",
peersNodeAddress, uid, message);
peersNodeAddress, uid, message.toString().substring(0, Math.min(60, message.toString().length())));
}
Object objectToWrite;
@ -425,6 +428,7 @@ public class Connection implements MessageListener {
private Date lastActivityDate;
private volatile boolean stopped;
private ConnectionListener.Reason shutDownReason;
private List<Long> messageTimeStamps = new CopyOnWriteArrayList<>();
public SharedModel(Connection connection, Socket socket) {
Log.traceCall();
@ -514,6 +518,27 @@ public class Connection implements MessageListener {
this.stopped = true;
}
private boolean tooManyMessages() {
long now = System.currentTimeMillis();
boolean exceeds = false;
if (messageTimeStamps.size() >= MAX_MSG_PER_SEC) {
// check if we got more than 10 msg per sec.
long compareTo = messageTimeStamps.get(messageTimeStamps.size() - MAX_MSG_PER_SEC);
exceeds = now - compareTo < 1000;
}
if (messageTimeStamps.size() >= MAX_MSG_PER_10SEC) {
// check if we got more than 50 msg per 10 sec.
long compareTo = messageTimeStamps.get(messageTimeStamps.size() - MAX_MSG_PER_10SEC);
exceeds = exceeds || now - compareTo < 10000;
// we limit to max 50 entries
messageTimeStamps.remove(0);
}
messageTimeStamps.add(now);
return exceeds;
}
public synchronized ConnectionListener.Reason getShutDownReason() {
return shutDownReason;
}
@ -529,9 +554,9 @@ public class Connection implements MessageListener {
}
///////////////////////////////////////////////////////////////////////////////////////////
// InputHandler
///////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////
// InputHandler
///////////////////////////////////////////////////////////////////////////////////////////
// Runs in same thread as Connection
private static class InputHandler implements Runnable {
@ -576,8 +601,9 @@ public class Connection implements MessageListener {
log.trace("New data arrived at inputHandler.Connection=" + sharedModel.getConnectionInfo());
log.info("\n\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n" +
"New data arrived at inputHandler.\nReceived object={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n", rawInputObject);
"New data arrived at inputHandler.\nTruncated received object={}"
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n",
rawInputObject.toString().substring(0, Math.min(60, rawInputObject.toString().length())));
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
if (size > getMaxMsgSize()) {
@ -620,8 +646,13 @@ public class Connection implements MessageListener {
return;
}
sharedModel.updateLastActivityDate();
if (sharedModel.tooManyMessages()) {
sharedModel.reportIllegalRequest(IllegalRequest.TooManyMessages);
return;
}
Connection connection = sharedModel.connection;
sharedModel.updateLastActivityDate();
if (message instanceof CloseConnectionMessage) {
log.info("CloseConnectionMessage received on connection {}", connection);
stopped = true;