Merge pull request #433 from ivilata/NewTimerRaceConditions

New timer race conditions
This commit is contained in:
Manfred Karrer 2016-05-05 13:37:07 +02:00
commit 8bf4e88c1f
5 changed files with 85 additions and 72 deletions

View File

@ -116,16 +116,9 @@ public class BroadcastHandler implements PeerManager.Listener {
numOfPeers = Math.min(5, connectedPeersList.size());
factor = 2;
}
log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeersSet.size());
for (int i = 0; i < numOfPeers; i++) {
final long minDelay = i * 30 * factor + 1;
final long maxDelay = minDelay * 2 + 30 * factor;
final Connection connection = connectedPeersList.get(i);
UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
long timeoutDelay = TIMEOUT_PER_PEER_SEC * numOfPeers;
timeoutTimer = UserThread.runAfter(() -> {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "Timeout: Broadcast did not complete after " + timeoutDelay + " sec.";
log.warn(errorMessage + "\n\t" +
@ -137,6 +130,16 @@ public class BroadcastHandler implements PeerManager.Listener {
"broadcastQueue=" + broadcastQueue);
onFault(errorMessage);
}, timeoutDelay);
log.info("Broadcast message to {} peers out of {} total connected peers.", numOfPeers, connectedPeersSet.size());
for (int i = 0; i < numOfPeers; i++) {
if (stopped)
break; // do not continue sending after a timeout or a cancellation
final long minDelay = i * 30 * factor + 1;
final long maxDelay = minDelay * 2 + 30 * factor;
final Connection connection = connectedPeersList.get(i);
UserThread.runAfterRandomDelay(() -> sendToPeer(connection, message), minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
} else {
onFault("Message not broadcasted because we have no available peers yet.\n\t" +
"message = " + StringUtils.abbreviate(message.toString(), 100), false);

View File

@ -66,33 +66,42 @@ public class GetDataRequestHandler {
Log.traceCall(getDataRequest + "\n\tconnection=" + connection);
GetDataResponse getDataResponse = new GetDataResponse(new HashSet<>(dataStorage.getMap().values()),
getDataRequest.getNonce());
SettableFuture<Connection> future = networkNode.sendMessage(connection, getDataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
cleanup();
listener.onComplete();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorMessage = "Sending getDataRequest to " + connection +
" failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." +
"Exception: " + throwable.getMessage();
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection);
}
});
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getDataResponse:" + getDataResponse +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
SettableFuture<Connection> future = networkNode.sendMessage(connection, getDataResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
cleanup();
listener.onComplete();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending getDataRequest to " + connection +
" failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." +
"Exception: " + throwable.getMessage();
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call.");
}
}
});
}
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -88,8 +88,22 @@ public class RequestDataHandler implements MessageListener {
else
getDataRequest = new GetUpdatedDataRequest(networkNode.getNodeAddress(), nonce);
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIME_OUT_SEC);
}
log.info("We send a {} to peer {}. ", getDataRequest.getClass().getSimpleName(), nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getDataRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -119,21 +133,6 @@ public class RequestDataHandler implements MessageListener {
}
}
});
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getDataRequest:" + getDataRequest +
" on nodeAddress:" + nodeAddress;
log.info(errorMessage + " / RequestDataHandler=" + RequestDataHandler.this);
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIME_OUT_SEC);
}
} else {
log.warn("We have stopped already. We ignore that requestData call.");
}

View File

@ -70,6 +70,21 @@ class GetPeersRequestHandler {
"The peers address must have been already set at the moment");
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.nonce,
peerManager.getConnectedNonSeedNodeReportedPeers(connection.getPeersNodeAddressOptional().get()));
checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersResponse:" + getPeersResponse + " on connection:" + connection;
log.info(errorMessage + " / PeerExchangeHandshake=" +
GetPeersRequestHandler.this);
log.info("timeoutTimer called. this=" + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
SettableFuture<Connection> future = networkNode.sendMessage(connection,
getPeersResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
@ -98,20 +113,6 @@ class GetPeersRequestHandler {
}
});
checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersResponse:" + getPeersResponse + " on connection:" + connection;
log.info(errorMessage + " / PeerExchangeHandshake=" +
GetPeersRequestHandler.this);
log.info("timeoutTimer called. this=" + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
peerManager.addToReportedPeers(getPeersRequest.reportedPeers, connection);
}

View File

@ -84,6 +84,22 @@ class PeerExchangeHandler implements MessageListener {
if (!stopped) {
if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getConnectedNonSeedNodeReportedPeers(nodeAddress));
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
} else {
log.trace("We have stopped that handler already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, getPeersRequest);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
@ -116,21 +132,6 @@ class PeerExchangeHandler implements MessageListener {
}
}
});
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> {
if (!stopped) {
String errorMessage = "A timeout occurred at sending getPeersRequest:" + getPeersRequest + " for nodeAddress:" + nodeAddress;
log.info(errorMessage + " / PeerExchangeHandler=" +
PeerExchangeHandler.this);
log.info("timeoutTimer called on " + this);
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, nodeAddress);
} else {
log.trace("We have stopped that handler already. We ignore that timeoutTimer.run call.");
}
},
TIME_OUT_SEC, TimeUnit.SECONDS);
}
} else {
log.debug("My node address is still null at sendGetPeersRequest. We ignore that call.");
}