synchronize broadcaster requests and handlers (#1925)

This commit is contained in:
woodser 2025-09-09 09:21:05 -04:00 committed by GitHub
parent 5f3e366920
commit 171acd5221
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -51,6 +51,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
private final ListeningExecutorService executor;
private final Object lock = new Object();
///////////////////////////////////////////////////////////////////////////////////////////
@ -76,13 +77,15 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
log.info("Broadcaster shutdown started");
shutDownRequested = true;
shutDownResultHandler = resultHandler;
if (broadcastRequests.isEmpty()) {
doShutDown();
} else {
// We set delay of broadcasts and timeout to very low values,
// so we can expect that we get onCompleted called very fast and trigger the
// doShutDown from there.
maybeBroadcastBundle();
synchronized (lock) {
if (broadcastRequests.isEmpty()) {
doShutDown();
} else {
// We set delay of broadcasts and timeout to very low values,
// so we can expect that we get onCompleted called very fast and trigger the
// doShutDown from there.
maybeBroadcastBundle();
}
}
executor.shutdown();
}
@ -93,9 +96,11 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private void doShutDown() {
log.info("Broadcaster doShutDown started");
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) {
timer.stop();
synchronized (lock) {
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) {
timer.stop();
}
}
shutDownResultHandler.run();
}
@ -112,23 +117,27 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
public void broadcast(BroadcastMessage message,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener) {
broadcastRequests.add(new BroadcastRequest(message, sender, listener));
if (timer == null) {
timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS);
synchronized (lock) {
broadcastRequests.add(new BroadcastRequest(message, sender, listener));
if (timer == null) {
timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}
}
private void maybeBroadcastBundle() {
if (!broadcastRequests.isEmpty()) {
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor);
broadcastRequests.clear();
synchronized (lock) {
if (!broadcastRequests.isEmpty()) {
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor);
broadcastRequests.clear();
if (timer != null) {
timer.stop();
if (timer != null) {
timer.stop();
}
timer = null;
}
timer = null;
}
}
@ -138,9 +147,11 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
@Override
public void onCompleted(BroadcastHandler broadcastHandler) {
broadcastHandlers.remove(broadcastHandler);
if (shutDownRequested) {
doShutDown();
synchronized (lock) {
broadcastHandlers.remove(broadcastHandler);
if (shutDownRequested) {
doShutDown();
}
}
}