From 892eaa440adb6c481974857f91c52c6c29f3b3de Mon Sep 17 00:00:00 2001 From: woodser <woodser@protonmail.com> Date: Sun, 21 Jan 2024 05:28:19 -0500 Subject: [PATCH] fix trade initialization error handling and run off trade thread --- .../main/java/haveno/core/trade/Trade.java | 239 +++++++++--------- 1 file changed, 119 insertions(+), 120 deletions(-) diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 7b172957e7..93dc0dbd47 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -590,141 +590,140 @@ public abstract class Trade implements Tradable, Model { /////////////////////////////////////////////////////////////////////////////////////////// public void initialize(ProcessModelServiceProvider serviceProvider) { - ThreadUtils.await(() -> { - if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); + if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); - // check if done - if (isPayoutUnlocked()) { - clearAndShutDown(); - return; - } + // done if payout unlocked + if (isPayoutUnlocked()) { + clearAndShutDown(); + return; + } - // set arbitrator pub key ring once known - serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { - getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); + // set arbitrator pub key ring once known + serviceProvider.getArbitratorManager().getDisputeAgentByNodeAddress(getArbitratorNodeAddress()).ifPresent(arbitrator -> { + getArbitrator().setPubKeyRing(arbitrator.getPubKeyRing()); + }); + + // handle connection change on dedicated thread + xmrConnectionService.addConnectionListener(connection -> { + ThreadUtils.submitToPool(() -> { // TODO: remove this? + ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); }); + }); - // handle connection change on dedicated thread - xmrConnectionService.addConnectionListener(connection -> { - ThreadUtils.submitToPool(() -> { // TODO: remove this? - ThreadUtils.execute(() -> onConnectionChanged(connection), getConnectionChangedThreadId()); - }); - }); + // reset buyer's payment sent state if no ack receive + if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { + log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); + setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); + } - // reset buyer's payment sent state if no ack receive - if (this instanceof BuyerTrade && getState().ordinal() >= Trade.State.BUYER_CONFIRMED_PAYMENT_SENT.ordinal() && getState().ordinal() < Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG.ordinal()) { - log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); - setState(Trade.State.DEPOSIT_TXS_UNLOCKED_IN_BLOCKCHAIN); - } + // reset seller's payment received state if no ack receive + if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) { + log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); + } - // reset seller's payment received state if no ack receive - if (this instanceof SellerTrade && getState().ordinal() >= Trade.State.SELLER_CONFIRMED_PAYMENT_RECEIPT.ordinal() && getState().ordinal() < Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG.ordinal()) { - log.warn("Resetting state of {} {} from {} to {} because no ack was received", getClass().getSimpleName(), getId(), getState(), Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); - setState(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); - } + // handle trade state events + tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (newValue == Trade.State.MULTISIG_COMPLETED) { + updateWalletRefreshPeriod(); + startPolling(); + } + }, getId()); + }); - // handle trade state events - tradeStateSubscription = EasyBind.subscribe(stateProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (newValue == Trade.State.MULTISIG_COMPLETED) { - updateWalletRefreshPeriod(); - startPolling(); - } - }, getId()); - }); - - // handle trade phase events - tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); - if (isPaymentReceived()) { - UserThread.execute(() -> { - if (tradePhaseSubscription != null) { - tradePhaseSubscription.unsubscribe(); - tradePhaseSubscription = null; - } - }); - } - }, getId()); - }); - - // handle payout events - payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { - if (isShutDownStarted) return; - ThreadUtils.execute(() -> { - if (isPayoutPublished()) updateWalletRefreshPeriod(); - - // handle when payout published - if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) { - log.info("Payout published for {} {}", getClass().getSimpleName(), getId()); - - // sync main wallet to update pending balance - new Thread(() -> { - GenUtils.waitFor(1000); - if (isShutDownStarted) return; - if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet()); - }).start(); - - // complete disputed trade - if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) { - processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED); - if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets + // handle trade phase events + tradePhaseSubscription = EasyBind.subscribe(phaseProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (isDepositsPublished() && !isPayoutUnlocked()) updateWalletRefreshPeriod(); + if (isPaymentReceived()) { + UserThread.execute(() -> { + if (tradePhaseSubscription != null) { + tradePhaseSubscription.unsubscribe(); + tradePhaseSubscription = null; } + }); + } + }, getId()); + }); - // auto complete arbitrator trade - if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this); + // handle payout events + payoutStateSubscription = EasyBind.subscribe(payoutStateProperty, newValue -> { + if (!isInitialized || isShutDownStarted) return; + ThreadUtils.execute(() -> { + if (isPayoutPublished()) updateWalletRefreshPeriod(); - // reset address entries - processModel.getXmrWalletService().resetAddressEntriesForTrade(getId()); + // handle when payout published + if (newValue == Trade.PayoutState.PAYOUT_PUBLISHED) { + log.info("Payout published for {} {}", getClass().getSimpleName(), getId()); + + // sync main wallet to update pending balance + new Thread(() -> { + GenUtils.waitFor(1000); + if (isShutDownStarted) return; + if (Boolean.TRUE.equals(xmrConnectionService.isConnected())) xmrWalletService.syncWallet(xmrWalletService.getWallet()); + }).start(); + + // complete disputed trade + if (getDisputeState().isArbitrated() && !getDisputeState().isClosed()) { + processModel.getTradeManager().closeDisputedTrade(getId(), Trade.DisputeState.DISPUTE_CLOSED); + if (!isArbitrator()) for (Dispute dispute : getDisputes()) dispute.setIsClosed(); // auto close trader tickets } - // handle when payout unlocks - if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { - if (!isInitialized) return; - log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); - clearAndShutDown(); - } - }, getId()); - }); + // auto complete arbitrator trade + if (isArbitrator() && !isCompleted()) processModel.getTradeManager().onTradeCompleted(this); - // arbitrator syncs idle wallet when payout unlock expected - if (this instanceof ArbitratorTrade) { - idlePayoutSyncer = new IdlePayoutSyncer(); - xmrWalletService.addWalletListener(idlePayoutSyncer); - } - - // TODO: buyer's payment sent message state property can become unsynced (after improper shut down?) - if (isBuyer()) { - MessageState expectedState = getPaymentSentMessageState(); - if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) { - log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get()); - processModel.getPaymentSentMessageStateProperty().set(expectedState); + // reset address entries + processModel.getXmrWalletService().resetAddressEntriesForTrade(getId()); } - } - // trade is initialized - isInitialized = true; - - // done if payout unlocked or deposit not requested - if (!isDepositRequested() || isPayoutUnlocked()) return; - - // done if wallet does not exist - if (!walletExists()) { - MoneroTx payoutTx = getPayoutTx(); - if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) { - log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState()); - setPayoutStateUnlocked(); - return; - } else { - throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId()); + // handle when payout unlocks + if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) { + if (!isInitialized) return; + log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId()); + clearAndShutDown(); } - } + }, getId()); + }); - // initialize syncing and polling - initSyncing(); - }, getId()); + // arbitrator syncs idle wallet when payout unlock expected + if (this instanceof ArbitratorTrade) { + idlePayoutSyncer = new IdlePayoutSyncer(); + xmrWalletService.addWalletListener(idlePayoutSyncer); + } + + // TODO: buyer's payment sent message state property can become unsynced (after improper shut down?) + if (isBuyer()) { + MessageState expectedState = getPaymentSentMessageState(); + if (expectedState != null && expectedState != processModel.getPaymentSentMessageStateProperty().get()) { + log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStateProperty().get()); + processModel.getPaymentSentMessageStateProperty().set(expectedState); + } + } + + // trade is initialized + isInitialized = true; + + // done if deposit not requested or payout unlocked + if (!isDepositRequested() || isPayoutUnlocked()) return; + + // open wallet or done if wallet does not exist + if (walletExists()) getWallet(); + else { + MoneroTx payoutTx = getPayoutTx(); + if (payoutTx != null && payoutTx.getNumConfirmations() >= 10) { + log.warn("Payout state for {} {} is {} but payout is unlocked, updating state", getClass().getSimpleName(), getId(), getPayoutState()); + setPayoutStateUnlocked(); + return; + } else { + throw new IllegalStateException("Missing trade wallet for " + getClass().getSimpleName() + " " + getId()); + } + } + + // initialize syncing and polling + tryInitSyncing(); } public void requestPersistence() { @@ -1940,12 +1939,12 @@ public abstract class Trade implements Tradable, Model { // sync and reprocess messages on new thread if (isInitialized && connection != null && !Boolean.FALSE.equals(connection.isConnected())) { - ThreadUtils.execute(() -> initSyncing(), getId()); + ThreadUtils.execute(() -> tryInitSyncing(), getId()); } } } - private void initSyncing() { + private void tryInitSyncing() { if (isShutDownStarted) return; if (!isIdling()) { initSyncingAux();