From bf6901c7aa28dab9505cd21cdec2fe552bb4a0c3 Mon Sep 17 00:00:00 2001 From: woodser <13068859+woodser@users.noreply.github.com> Date: Wed, 7 Jan 2026 19:20:36 -0500 Subject: [PATCH] modify ui-bound observable lists on user thread --- .../haveno/core/offer/OpenOfferManager.java | 2 +- .../core/support/dispute/DisputeManager.java | 40 +++-- .../daemon/grpc/GrpcDisputesService.java | 141 ++++++++++-------- .../pendingtrades/PendingTradesDataModel.java | 52 +++---- 4 files changed, 137 insertions(+), 98 deletions(-) diff --git a/core/src/main/java/haveno/core/offer/OpenOfferManager.java b/core/src/main/java/haveno/core/offer/OpenOfferManager.java index 64949a3369..7f5d729802 100644 --- a/core/src/main/java/haveno/core/offer/OpenOfferManager.java +++ b/core/src/main/java/haveno/core/offer/OpenOfferManager.java @@ -1761,7 +1761,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe request.getReserveTxKeyImages(), verifiedTx.getFee().longValueExact(), signature); // TODO (woodser): no need for signature to be part of SignedOffer? - addSignedOffer(signedOffer); + UserThread.execute(() -> addSignedOffer(signedOffer)); requestPersistence(); // send response with signature diff --git a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java index 9dfe58d9fd..672b1d729c 100644 --- a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java +++ b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java @@ -420,7 +420,12 @@ public abstract class DisputeManager> extends Sup if (reOpen) { dispute = storedDisputeOptional.get(); } else { - disputeList.add(dispute); + final Dispute finalDispute = dispute; + UserThread.execute(() -> { + synchronized (disputeList.getObservableList()) { + disputeList.add(finalDispute); + } + }); } } @@ -556,15 +561,17 @@ public abstract class DisputeManager> extends Sup } public void removeDisputes(Trade trade) { - T disputeList = getDisputeList(); - synchronized (disputeList.getObservableList()) { - for (Dispute dispute : trade.getDisputes()) { - disputeList.remove(dispute); + UserThread.execute(() -> { + T disputeList = getDisputeList(); + synchronized (disputeList.getObservableList()) { + for (Dispute dispute : trade.getDisputes()) { + disputeList.remove(dispute); + } } - } - trade.setDisputeState(Trade.DisputeState.NO_DISPUTE); - clearPendingMessage(); - requestPersistence(); + trade.setDisputeState(Trade.DisputeState.NO_DISPUTE); + clearPendingMessage(); + requestPersistence(); + }); } // arbitrator receives dispute opened message from opener, opener's peer receives from arbitrator @@ -694,7 +701,11 @@ public abstract class DisputeManager> extends Sup if (reOpen) { trade.setDisputeState(Trade.DisputeState.DISPUTE_OPENED); } else { - disputeList.add(dispute); + UserThread.execute(() -> { + synchronized (disputeList) { + disputeList.add(dispute); + } + }); trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); } @@ -822,9 +833,12 @@ public abstract class DisputeManager> extends Sup dispute = storedDisputeOptional.get(); dispute.reOpen(); } else { - synchronized (disputeList) { - disputeList.add(dispute); - } + final Dispute finalDispute = dispute; + UserThread.execute(() -> { + synchronized (disputeList) { + disputeList.add(finalDispute); + } + }); } // get trade diff --git a/daemon/src/main/java/haveno/daemon/grpc/GrpcDisputesService.java b/daemon/src/main/java/haveno/daemon/grpc/GrpcDisputesService.java index bc7a9a0f74..9d62ea29a1 100644 --- a/daemon/src/main/java/haveno/daemon/grpc/GrpcDisputesService.java +++ b/daemon/src/main/java/haveno/daemon/grpc/GrpcDisputesService.java @@ -1,6 +1,9 @@ package haveno.daemon.grpc; import com.google.inject.Inject; + +import haveno.common.ThreadUtils; +import haveno.common.UserThread; import haveno.common.config.Config; import haveno.common.proto.ProtoUtil; import haveno.core.api.CoreApi; @@ -48,82 +51,102 @@ public class GrpcDisputesService extends DisputesImplBase { @Override public void openDispute(OpenDisputeRequest req, StreamObserver responseObserver) { - try { - coreApi.openDispute(req.getTradeId(), - () -> { - var reply = OpenDisputeReply.newBuilder().build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - }, - (errorMessage, throwable) -> { - log.info("Error in openDispute" + errorMessage); - exceptionHandler.handleErrorMessage(log, errorMessage, responseObserver); - }); - } catch (Throwable cause) { - exceptionHandler.handleException(log, cause, responseObserver); - } + UserThread.execute(() -> { + ThreadUtils.submitToPool(() -> { + try { + coreApi.openDispute(req.getTradeId(), + () -> { + var reply = OpenDisputeReply.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + }, + (errorMessage, throwable) -> { + log.info("Error in openDispute" + errorMessage); + exceptionHandler.handleErrorMessage(log, errorMessage, responseObserver); + }); + } catch (Throwable cause) { + exceptionHandler.handleException(log, cause, responseObserver); + } + }); + }); } @Override public void getDispute(GetDisputeRequest req, StreamObserver responseObserver) { - try { - var dispute = coreApi.getDispute(req.getTradeId()); - var reply = GetDisputeReply.newBuilder() - .setDispute(dispute.toProtoMessage()) - .build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable cause) { - exceptionHandler.handleExceptionAsWarning(log, getClass().getName() + ".getDispute", cause, responseObserver); - } + UserThread.execute(() -> { // offers are updated on user thread + ThreadUtils.submitToPool(() -> { + try { + var dispute = coreApi.getDispute(req.getTradeId()); + var reply = GetDisputeReply.newBuilder() + .setDispute(dispute.toProtoMessage()) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable cause) { + exceptionHandler.handleExceptionAsWarning(log, getClass().getName() + ".getDispute", cause, responseObserver); + } + }); + }); } @Override public void getDisputes(GetDisputesRequest req, StreamObserver responseObserver) { - try { - var disputes = coreApi.getDisputes(); - var disputesProtobuf = disputes.stream() - .map(d -> d.toProtoMessage()) - .collect(Collectors.toList()); - var reply = GetDisputesReply.newBuilder() - .addAllDisputes(disputesProtobuf) - .build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable cause) { - exceptionHandler.handleException(log, cause, responseObserver); - } + UserThread.execute(() -> { + ThreadUtils.submitToPool(() -> { + try { + var disputes = coreApi.getDisputes(); + var disputesProtobuf = disputes.stream() + .map(d -> d.toProtoMessage()) + .collect(Collectors.toList()); + var reply = GetDisputesReply.newBuilder() + .addAllDisputes(disputesProtobuf) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable cause) { + exceptionHandler.handleException(log, cause, responseObserver); + } + }); + }); } @Override public void resolveDispute(ResolveDisputeRequest req, StreamObserver responseObserver) { - try { - var winner = ProtoUtil.enumFromProto(DisputeResult.Winner.class, req.getWinner().name()); - var reason = ProtoUtil.enumFromProto(DisputeResult.Reason.class, req.getReason().name()); - coreApi.resolveDispute(req.getTradeId(), winner, reason, req.getSummaryNotes(), req.getCustomPayoutAmount()); - var reply = ResolveDisputeReply.newBuilder().build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable cause) { - cause.printStackTrace(); - exceptionHandler.handleExceptionAsWarning(log, getClass().getName() + ".resolveDispute", cause, responseObserver); - } + UserThread.execute(() -> { + ThreadUtils.submitToPool(() -> { + try { + var winner = ProtoUtil.enumFromProto(DisputeResult.Winner.class, req.getWinner().name()); + var reason = ProtoUtil.enumFromProto(DisputeResult.Reason.class, req.getReason().name()); + coreApi.resolveDispute(req.getTradeId(), winner, reason, req.getSummaryNotes(), req.getCustomPayoutAmount()); + var reply = ResolveDisputeReply.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable cause) { + cause.printStackTrace(); + exceptionHandler.handleExceptionAsWarning(log, getClass().getName() + ".resolveDispute", cause, responseObserver); + } + }); + }); } @Override public void sendDisputeChatMessage(SendDisputeChatMessageRequest req, StreamObserver responseObserver) { - try { - var attachmentsProto = req.getAttachmentsList(); - var attachments = attachmentsProto.stream().map(a -> Attachment.fromProto(a)) - .collect(Collectors.toList()); - coreApi.sendDisputeChatMessage(req.getDisputeId(), req.getMessage(), new ArrayList(attachments)); - var reply = SendDisputeChatMessageReply.newBuilder().build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable cause) { - exceptionHandler.handleException(log, cause, responseObserver); - } + UserThread.execute(() -> { + ThreadUtils.submitToPool(() -> { + try { + var attachmentsProto = req.getAttachmentsList(); + var attachments = attachmentsProto.stream().map(a -> Attachment.fromProto(a)) + .collect(Collectors.toList()); + coreApi.sendDisputeChatMessage(req.getDisputeId(), req.getMessage(), new ArrayList(attachments)); + var reply = SendDisputeChatMessageReply.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable cause) { + exceptionHandler.handleException(log, cause, responseObserver); + } + }); + }); } final ServerInterceptor[] interceptors() { diff --git a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesDataModel.java b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesDataModel.java index 8df9fdb33d..52698a7440 100644 --- a/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesDataModel.java +++ b/desktop/src/main/java/haveno/desktop/main/portfolio/pendingtrades/PendingTradesDataModel.java @@ -314,37 +314,39 @@ public class PendingTradesDataModel extends ActivatableDataModel { /////////////////////////////////////////////////////////////////////////////////////////// private void onListChanged() { - synchronized (tradeManager.getObservableList()) { + UserThread.execute(() -> { + synchronized (tradeManager.getObservableList()) { - // add or remove listener for hidden trades - for (Trade trade : tradeManager.getObservableList()) { - if (isTradeShown(trade)) { - if (hiddenTrades.contains(trade)) { - UserThread.execute(() -> trade.stateProperty().removeListener(hiddenStateChangeListener)); - hiddenTrades.remove(trade); - } - } else { - if (!hiddenTrades.contains(trade)) { - UserThread.execute(() -> trade.stateProperty().addListener(hiddenStateChangeListener)); - hiddenTrades.add(trade); + // add or remove listener for hidden trades + for (Trade trade : tradeManager.getObservableList()) { + if (isTradeShown(trade)) { + if (hiddenTrades.contains(trade)) { + UserThread.execute(() -> trade.stateProperty().removeListener(hiddenStateChangeListener)); + hiddenTrades.remove(trade); + } + } else { + if (!hiddenTrades.contains(trade)) { + UserThread.execute(() -> trade.stateProperty().addListener(hiddenStateChangeListener)); + hiddenTrades.add(trade); + } } } - } - - // add shown trades to list - synchronized (list) { - list.clear(); - list.addAll(tradeManager.getObservableList().stream() - .filter(trade -> isTradeShown(trade)) - .map(trade -> new PendingTradesListItem(trade, btcFormatter)) - .collect(Collectors.toList())); + + // add shown trades to list + synchronized (list) { + list.clear(); + list.addAll(tradeManager.getObservableList().stream() + .filter(trade -> isTradeShown(trade)) + .map(trade -> new PendingTradesListItem(trade, btcFormatter)) + .collect(Collectors.toList())); - // we sort by date, earliest first - list.sort((o1, o2) -> o2.getTrade().getDate().compareTo(o1.getTrade().getDate())); + // we sort by date, earliest first + list.sort((o1, o2) -> o2.getTrade().getDate().compareTo(o1.getTrade().getDate())); + } } - } - selectBestItem(); + selectBestItem(); + }); } private boolean isTradeShown(Trade trade) {