diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index e218bf970f..769d7e95af 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -811,6 +811,10 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { if (processModel.getTradeManager() != null) processModel.getTradeManager().requestPersistence(); } + public void persistNow(@Nullable Runnable completeHandler) { + processModel.getTradeManager().persistNow(completeHandler); + } + public TradeProtocol getProtocol() { return processModel.getTradeManager().getTradeProtocol(this); } diff --git a/core/src/main/java/haveno/core/trade/TradeManager.java b/core/src/main/java/haveno/core/trade/TradeManager.java index 9b092decac..e34cc9b1a7 100644 --- a/core/src/main/java/haveno/core/trade/TradeManager.java +++ b/core/src/main/java/haveno/core/trade/TradeManager.java @@ -546,6 +546,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi persistenceManager.requestPersistence(); } + public void persistNow(@Nullable Runnable completeHandler) { + persistenceManager.persistNow(completeHandler); + } + private void handleInitTradeRequest(InitTradeRequest request, NodeAddress sender) { log.info("TradeManager handling InitTradeRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); diff --git a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java index 65fcee23c6..09c26d8a1d 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java @@ -537,62 +537,63 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D // save message for reprocessing trade.getBuyer().setPaymentSentMessage(message); - trade.requestPersistence(); + trade.persistNow(() -> { - // process message on trade thread - if (!trade.isInitialized() || trade.isShutDownStarted()) return; - ThreadUtils.execute(() -> { - // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case - // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received - // a mailbox message with PaymentSentMessage. - // TODO A better fix would be to add a listener for the wallet sync state and process - // the mailbox msg once wallet is ready and trade state set. - synchronized (trade.getLock()) { - if (!trade.isInitialized() || trade.isShutDownStarted()) return; - if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Received another PaymentSentMessage which was already processed for {} {}, ACKing", trade.getClass().getSimpleName(), trade.getId()); - handleTaskRunnerSuccess(peer, message); - return; + // process message on trade thread + if (!trade.isInitialized() || trade.isShutDownStarted()) return; + ThreadUtils.execute(() -> { + // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case + // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received + // a mailbox message with PaymentSentMessage. + // TODO A better fix would be to add a listener for the wallet sync state and process + // the mailbox msg once wallet is ready and trade state set. + synchronized (trade.getLock()) { + if (!trade.isInitialized() || trade.isShutDownStarted()) return; + if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Received another PaymentSentMessage which was already processed for {} {}, ACKing", trade.getClass().getSimpleName(), trade.getId()); + handleTaskRunnerSuccess(peer, message); + return; + } + if (trade.getPayoutTx() != null) { + log.warn("We received a PaymentSentMessage but we have already created the payout tx " + + "so we ignore the message. This can happen if the ACK message to the peer did not " + + "arrive and the peer repeats sending us the message. We send another ACK msg."); + sendAckMessage(peer, message, true, null); + removeMailboxMessageAfterProcessing(message); + return; + } + latchTrade(); + expect(anyPhase() + .with(message) + .from(peer)) + .setup(tasks( + ApplyFilter.class, + ProcessPaymentSentMessage.class, + VerifyPeersAccountAgeWitness.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + (errorMessage) -> { + log.warn("Error processing payment sent message: " + errorMessage); + processModel.getTradeManager().requestPersistence(); + + // schedule to reprocess message unless deleted + if (trade.getBuyer().getPaymentSentMessage() != null) { + UserThread.runAfter(() -> { + reprocessPaymentSentMessageCount++; + maybeReprocessPaymentSentMessage(reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount)); + } else { + handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack + } + unlatchTrade(); + }))) + .executeTasks(true); + awaitTradeLatch(); } - if (trade.getPayoutTx() != null) { - log.warn("We received a PaymentSentMessage but we have already created the payout tx " + - "so we ignore the message. This can happen if the ACK message to the peer did not " + - "arrive and the peer repeats sending us the message. We send another ACK msg."); - sendAckMessage(peer, message, true, null); - removeMailboxMessageAfterProcessing(message); - return; - } - latchTrade(); - expect(anyPhase() - .with(message) - .from(peer)) - .setup(tasks( - ApplyFilter.class, - ProcessPaymentSentMessage.class, - VerifyPeersAccountAgeWitness.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - (errorMessage) -> { - log.warn("Error processing payment sent message: " + errorMessage); - processModel.getTradeManager().requestPersistence(); - - // schedule to reprocess message unless deleted - if (trade.getBuyer().getPaymentSentMessage() != null) { - UserThread.runAfter(() -> { - reprocessPaymentSentMessageCount++; - maybeReprocessPaymentSentMessage(reprocessOnError); - }, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount)); - } else { - handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack - } - unlatchTrade(); - }))) - .executeTasks(true); - awaitTradeLatch(); - } - }, trade.getId()); + }, trade.getId()); + }); } // received by buyer and arbitrator @@ -619,59 +620,60 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D // save message for reprocessing trade.getSeller().setPaymentReceivedMessage(message); - trade.requestPersistence(); + trade.persistNow(() -> { - // process message on trade thread - if (!trade.isInitialized() || trade.isShutDownStarted()) return; - ThreadUtils.execute(() -> { - synchronized (trade.getLock()) { - if (!trade.isInitialized() || trade.isShutDownStarted()) return; - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); + // process message on trade thread + if (!trade.isInitialized() || trade.isShutDownStarted()) return; + ThreadUtils.execute(() -> { + synchronized (trade.getLock()) { + if (!trade.isInitialized() || trade.isShutDownStarted()) return; + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); - // check minimum trade phase - if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) { - log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); - return; + // check minimum trade phase + if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) { + log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } + if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) { + log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } + if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) { + log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); + return; + } + + expect(anyPhase() + .with(message) + .from(peer)) + .setup(tasks( + ProcessPaymentReceivedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + log.warn("Error processing payment received message: " + errorMessage); + processModel.getTradeManager().requestPersistence(); + + // schedule to reprocess message unless deleted + if (trade.getSeller().getPaymentReceivedMessage() != null) { + UserThread.runAfter(() -> { + reprocessPaymentReceivedMessageCount++; + maybeReprocessPaymentReceivedMessage(reprocessOnError); + }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount)); + } else { + handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack + } + unlatchTrade(); + }))) + .executeTasks(true); + awaitTradeLatch(); } - if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) { - log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); - return; - } - if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) { - log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); - return; - } - - expect(anyPhase() - .with(message) - .from(peer)) - .setup(tasks( - ProcessPaymentReceivedMessage.class) - .using(new TradeTaskRunner(trade, - () -> { - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - log.warn("Error processing payment received message: " + errorMessage); - processModel.getTradeManager().requestPersistence(); - - // schedule to reprocess message unless deleted - if (trade.getSeller().getPaymentReceivedMessage() != null) { - UserThread.runAfter(() -> { - reprocessPaymentReceivedMessageCount++; - maybeReprocessPaymentReceivedMessage(reprocessOnError); - }, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount)); - } else { - handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack - } - unlatchTrade(); - }))) - .executeTasks(true); - awaitTradeLatch(); - } - }, trade.getId()); + }, trade.getId()); + }); } public void onWithdrawCompleted() {