From 171acd52213c0b63a2aef5a3184902eb56c718a7 Mon Sep 17 00:00:00 2001 From: woodser Date: Tue, 9 Sep 2025 09:21:05 -0400 Subject: [PATCH] synchronize broadcaster requests and handlers (#1925) --- .../haveno/network/p2p/peers/Broadcaster.java | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/p2p/src/main/java/haveno/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/haveno/network/p2p/peers/Broadcaster.java index 647182feeb..d70d44311e 100644 --- a/p2p/src/main/java/haveno/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/haveno/network/p2p/peers/Broadcaster.java @@ -51,6 +51,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private boolean shutDownRequested; private Runnable shutDownResultHandler; private final ListeningExecutorService executor; + private final Object lock = new Object(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -76,13 +77,15 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { log.info("Broadcaster shutdown started"); shutDownRequested = true; shutDownResultHandler = resultHandler; - if (broadcastRequests.isEmpty()) { - doShutDown(); - } else { - // We set delay of broadcasts and timeout to very low values, - // so we can expect that we get onCompleted called very fast and trigger the - // doShutDown from there. - maybeBroadcastBundle(); + synchronized (lock) { + if (broadcastRequests.isEmpty()) { + doShutDown(); + } else { + // We set delay of broadcasts and timeout to very low values, + // so we can expect that we get onCompleted called very fast and trigger the + // doShutDown from there. + maybeBroadcastBundle(); + } } executor.shutdown(); } @@ -93,9 +96,11 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private void doShutDown() { log.info("Broadcaster doShutDown started"); - broadcastHandlers.forEach(BroadcastHandler::cancel); - if (timer != null) { - timer.stop(); + synchronized (lock) { + broadcastHandlers.forEach(BroadcastHandler::cancel); + if (timer != null) { + timer.stop(); + } } shutDownResultHandler.run(); } @@ -112,23 +117,27 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { public void broadcast(BroadcastMessage message, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener) { - broadcastRequests.add(new BroadcastRequest(message, sender, listener)); - if (timer == null) { - timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS); + synchronized (lock) { + broadcastRequests.add(new BroadcastRequest(message, sender, listener)); + if (timer == null) { + timer = UserThread.runAfter(this::maybeBroadcastBundle, BROADCAST_INTERVAL_MS, TimeUnit.MILLISECONDS); + } } } private void maybeBroadcastBundle() { - if (!broadcastRequests.isEmpty()) { - BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); - broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); - broadcastRequests.clear(); + synchronized (lock) { + if (!broadcastRequests.isEmpty()) { + BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); + broadcastHandlers.add(broadcastHandler); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); + broadcastRequests.clear(); - if (timer != null) { - timer.stop(); + if (timer != null) { + timer.stop(); + } + timer = null; } - timer = null; } } @@ -138,9 +147,11 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { @Override public void onCompleted(BroadcastHandler broadcastHandler) { - broadcastHandlers.remove(broadcastHandler); - if (shutDownRequested) { - doShutDown(); + synchronized (lock) { + broadcastHandlers.remove(broadcastHandler); + if (shutDownRequested) { + doShutDown(); + } } }