From 6c2f3ea154f2cfa7975c0946d35dba25cca7a339 Mon Sep 17 00:00:00 2001 From: woodser Date: Sun, 24 Dec 2023 12:09:53 -0500 Subject: [PATCH] improve trade state reliability update trade state atomically on UserThread nullify error handling on deposits confirmed message set trade state before deposit request and relay add checks before deleting wallet UserThread.await() detects if on UserThread --- .../main/java/haveno/common/UserThread.java | 28 +++++++++++------ .../haveno/core/api/XmrConnectionService.java | 15 ++++----- .../main/java/haveno/core/trade/Trade.java | 31 ++++++++++++++++--- .../java/haveno/core/trade/TradeManager.java | 2 +- .../core/trade/protocol/TradeProtocol.java | 3 +- .../ArbitratorProcessDepositRequest.java | 4 +++ .../tasks/ProcessSignContractResponse.java | 10 +++--- 7 files changed, 65 insertions(+), 28 deletions(-) diff --git a/common/src/main/java/haveno/common/UserThread.java b/common/src/main/java/haveno/common/UserThread.java index 7265b3f491..57b6e635a8 100644 --- a/common/src/main/java/haveno/common/UserThread.java +++ b/common/src/main/java/haveno/common/UserThread.java @@ -45,6 +45,7 @@ public class UserThread { @Getter @Setter private static Executor executor; + private static final String USER_THREAD_NAME = "UserThread"; public static void setTimerClass(Class timerClass) { UserThread.timerClass = timerClass; @@ -57,22 +58,31 @@ public class UserThread { } public static void execute(Runnable command) { - UserThread.executor.execute(command); + UserThread.executor.execute(() -> { + Thread.currentThread().setName(USER_THREAD_NAME); + command.run(); + }); } public static void await(Runnable command) { - CountDownLatch latch = new CountDownLatch(1); - executor.execute(() -> { + if (isUserThread(Thread.currentThread())) { command.run(); - latch.countDown(); - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } else { + CountDownLatch latch = new CountDownLatch(1); + execute(command); // run task + execute(() -> latch.countDown()); // await next tick + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } + public static boolean isUserThread(Thread thread) { + return USER_THREAD_NAME.equals(thread.getName()); + } + // Prefer FxTimer if a delay is needed in a JavaFx class (gui module) public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) { return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS); diff --git a/core/src/main/java/haveno/core/api/XmrConnectionService.java b/core/src/main/java/haveno/core/api/XmrConnectionService.java index fffc48e4f1..de42e08a85 100644 --- a/core/src/main/java/haveno/core/api/XmrConnectionService.java +++ b/core/src/main/java/haveno/core/api/XmrConnectionService.java @@ -647,14 +647,15 @@ public final class XmrConnectionService { if (DevEnv.isDevMode()) e.printStackTrace(); } - // check connection which notifies of changes - if (connectionManager.getAutoSwitch()) connectionManager.setConnection(connectionManager.getBestAvailableConnection()); - else connectionManager.checkConnection(); + new Thread(() -> { + if (connectionManager.getAutoSwitch()) connectionManager.setConnection(connectionManager.getBestAvailableConnection()); + else connectionManager.checkConnection(); - // set error message - if (!Boolean.TRUE.equals(connectionManager.isConnected()) && HavenoUtils.havenoSetup != null) { - HavenoUtils.havenoSetup.getWalletServiceErrorMsg().set(e.getMessage()); - } + // set error message + if (!Boolean.TRUE.equals(connectionManager.isConnected()) && HavenoUtils.havenoSetup != null) { + HavenoUtils.havenoSetup.getWalletServiceErrorMsg().set(e.getMessage()); + } + }).start(); } finally { pollInProgress = false; } diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index fac49052d0..c410e956d8 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -802,6 +802,17 @@ public abstract class Trade implements Tradable, Model { return this instanceof ArbitratorTrade && isDepositsConfirmed() && walletExists() && syncNormalStartTimeMs == null; // arbitrator idles trade after deposits confirm unless overriden } + public boolean isSyncedWithinTolerance() { + synchronized (walletLock) { + if (wallet == null) return false; + if (!xmrConnectionService.isSyncedWithinTolerance()) return false; + Long targetHeight = xmrConnectionService.getTargetHeight(); + if (targetHeight == null) return false; + if (targetHeight - wallet.getHeight() <= 3) return true; // synced if within 3 blocks of target height + return false; + } + } + public void syncAndPollWallet() { syncWallet(true); } @@ -879,6 +890,18 @@ public abstract class Trade implements Tradable, Model { synchronized (walletLock) { if (walletExists()) { try { + + // check if synced + if (!isSyncedWithinTolerance()) { + log.warn("Refusing to delete wallet for {} {} because it is not synced within tolerance", getClass().getSimpleName(), getId()); + return; + } + + // check if balance > 0 + if (wallet.getBalance().compareTo(BigInteger.ZERO) > 0) { + log.warn("Refusing to delete wallet for {} {} because it contains a balance", getClass().getSimpleName(), getId()); + return; + } // check if funds deposited but payout not unlocked if (isDepositsPublished() && !isPayoutUnlocked()) { @@ -886,7 +909,7 @@ public abstract class Trade implements Tradable, Model { } // force stop the wallet - if (wallet != null) stopWallet(); + stopWallet(); // delete wallet log.info("Deleting wallet for {} {}", getClass().getSimpleName(), getId()); @@ -1279,7 +1302,7 @@ public abstract class Trade implements Tradable, Model { } this.state = state; - UserThread.execute(() -> { + UserThread.await(() -> { stateProperty.set(state); phaseProperty.set(state.getPhase()); }); @@ -1310,9 +1333,7 @@ public abstract class Trade implements Tradable, Model { } this.payoutState = payoutState; - UserThread.execute(() -> { - payoutStateProperty.set(payoutState); - }); + UserThread.await(() -> payoutStateProperty.set(payoutState)); } public void setDisputeState(DisputeState disputeState) { diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 50ea3d9151..ee2bd0ccbb 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -1225,7 +1225,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi } private void removeTradeOnError(Trade trade) { - log.warn("TradeManager.removeTradeOnError() " + trade.getId()); + log.warn("TradeManager.removeTradeOnError() tradeId={}, state={}", trade.getId(), trade.getState()); synchronized (tradableList) { // unreserve taker key images diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index bc9a9eeb60..e240c699ee 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -429,7 +429,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D .using(new TradeTaskRunner(trade, () -> { stopTimeout(); - this.errorMessageHandler = null; + this.errorMessageHandler = null; // TODO: set this when trade state is >= DEPOSIT_PUBLISHED handleTaskRunnerSuccess(sender, response); if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized }, @@ -446,6 +446,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); synchronized (trade) { latchTrade(); + this.errorMessageHandler = null; expect(new Condition(trade) .with(response) .from(sender)) diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java index 4ae7aafab8..c2a6f5ccf2 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorProcessDepositRequest.java @@ -112,6 +112,10 @@ public class ArbitratorProcessDepositRequest extends TradeTask { // TODO (woodser): add small delay so tx has head start against double spend attempts? if (processModel.getMaker().getDepositTxHex() != null && processModel.getTaker().getDepositTxHex() != null) { + // update trade state + trade.setState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST); + processModel.getTradeManager().requestPersistence(); + // relay txs MoneroSubmitTxResult makerResult = daemon.submitTxHex(processModel.getMaker().getDepositTxHex(), true); MoneroSubmitTxResult takerResult = daemon.submitTxHex(processModel.getTaker().getDepositTxHex(), true); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessSignContractResponse.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessSignContractResponse.java index 16d48c92c8..a2d712924d 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessSignContractResponse.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ProcessSignContractResponse.java @@ -84,6 +84,11 @@ public class ProcessSignContractResponse extends TradeTask { trade.getSelf().getDepositTx().getKey(), trade.getSelf().getPaymentAccountKey()); + // update trade state + trade.setState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST); + processModel.getTradeManager().requestPersistence(); + trade.addInitProgressStep(); + // send request to arbitrator log.info("Sending {} to arbitrator {}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitrator().getNodeAddress(), trade.getId(), request.getUid()); processModel.getP2PService().sendEncryptedDirectMessage(trade.getArbitrator().getNodeAddress(), trade.getArbitrator().getPubKeyRing(), request, new SendDirectMessageListener() { @@ -101,11 +106,6 @@ public class ProcessSignContractResponse extends TradeTask { failed(); } }); - - // deposit is requested - trade.setState(Trade.State.SENT_PUBLISH_DEPOSIT_TX_REQUEST); - trade.addInitProgressStep(); - processModel.getTradeManager().requestPersistence(); } else { log.info("Waiting for another contract signature to send deposit request"); complete(); // does not yet have needed signatures