persist trade with payment confirmation msgs before processing

This commit is contained in:
woodser 2025-04-21 18:14:22 -04:00 committed by woodser
parent 923b3ad73b
commit de5250e89a
3 changed files with 114 additions and 104 deletions

View file

@ -811,6 +811,10 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
if (processModel.getTradeManager() != null) processModel.getTradeManager().requestPersistence(); if (processModel.getTradeManager() != null) processModel.getTradeManager().requestPersistence();
} }
public void persistNow(@Nullable Runnable completeHandler) {
processModel.getTradeManager().persistNow(completeHandler);
}
public TradeProtocol getProtocol() { public TradeProtocol getProtocol() {
return processModel.getTradeManager().getTradeProtocol(this); return processModel.getTradeManager().getTradeProtocol(this);
} }

View file

@ -546,6 +546,10 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
persistenceManager.requestPersistence(); persistenceManager.requestPersistence();
} }
public void persistNow(@Nullable Runnable completeHandler) {
persistenceManager.persistNow(completeHandler);
}
private void handleInitTradeRequest(InitTradeRequest request, NodeAddress sender) { private void handleInitTradeRequest(InitTradeRequest request, NodeAddress sender) {
log.info("TradeManager handling InitTradeRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid()); log.info("TradeManager handling InitTradeRequest for tradeId={}, sender={}, uid={}", request.getOfferId(), sender, request.getUid());

View file

@ -537,62 +537,63 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// save message for reprocessing // save message for reprocessing
trade.getBuyer().setPaymentSentMessage(message); trade.getBuyer().setPaymentSentMessage(message);
trade.requestPersistence(); trade.persistNow(() -> {
// process message on trade thread // process message on trade thread
if (!trade.isInitialized() || trade.isShutDownStarted()) return; if (!trade.isInitialized() || trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
// We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
// that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
// a mailbox message with PaymentSentMessage. // a mailbox message with PaymentSentMessage.
// TODO A better fix would be to add a listener for the wallet sync state and process // TODO A better fix would be to add a listener for the wallet sync state and process
// the mailbox msg once wallet is ready and trade state set. // the mailbox msg once wallet is ready and trade state set.
synchronized (trade.getLock()) { synchronized (trade.getLock()) {
if (!trade.isInitialized() || trade.isShutDownStarted()) return; if (!trade.isInitialized() || trade.isShutDownStarted()) return;
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
log.warn("Received another PaymentSentMessage which was already processed for {} {}, ACKing", trade.getClass().getSimpleName(), trade.getId()); log.warn("Received another PaymentSentMessage which was already processed for {} {}, ACKing", trade.getClass().getSimpleName(), trade.getId());
handleTaskRunnerSuccess(peer, message); handleTaskRunnerSuccess(peer, message);
return; return;
} }
if (trade.getPayoutTx() != null) { if (trade.getPayoutTx() != null) {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " + log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " + "so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg."); "arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null); sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message); removeMailboxMessageAfterProcessing(message);
return; return;
} }
latchTrade(); latchTrade();
expect(anyPhase() expect(anyPhase()
.with(message) .with(message)
.from(peer)) .from(peer))
.setup(tasks( .setup(tasks(
ApplyFilter.class, ApplyFilter.class,
ProcessPaymentSentMessage.class, ProcessPaymentSentMessage.class,
VerifyPeersAccountAgeWitness.class) VerifyPeersAccountAgeWitness.class)
.using(new TradeTaskRunner(trade, .using(new TradeTaskRunner(trade,
() -> { () -> {
handleTaskRunnerSuccess(peer, message); handleTaskRunnerSuccess(peer, message);
}, },
(errorMessage) -> { (errorMessage) -> {
log.warn("Error processing payment sent message: " + errorMessage); log.warn("Error processing payment sent message: " + errorMessage);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
// schedule to reprocess message unless deleted // schedule to reprocess message unless deleted
if (trade.getBuyer().getPaymentSentMessage() != null) { if (trade.getBuyer().getPaymentSentMessage() != null) {
UserThread.runAfter(() -> { UserThread.runAfter(() -> {
reprocessPaymentSentMessageCount++; reprocessPaymentSentMessageCount++;
maybeReprocessPaymentSentMessage(reprocessOnError); maybeReprocessPaymentSentMessage(reprocessOnError);
}, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount)); }, trade.getReprocessDelayInSeconds(reprocessPaymentSentMessageCount));
} else { } else {
handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
} }
unlatchTrade(); unlatchTrade();
}))) })))
.executeTasks(true); .executeTasks(true);
awaitTradeLatch(); awaitTradeLatch();
} }
}, trade.getId()); }, trade.getId());
});
} }
// received by buyer and arbitrator // received by buyer and arbitrator
@ -619,59 +620,60 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// save message for reprocessing // save message for reprocessing
trade.getSeller().setPaymentReceivedMessage(message); trade.getSeller().setPaymentReceivedMessage(message);
trade.requestPersistence(); trade.persistNow(() -> {
// process message on trade thread // process message on trade thread
if (!trade.isInitialized() || trade.isShutDownStarted()) return; if (!trade.isInitialized() || trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
synchronized (trade.getLock()) { synchronized (trade.getLock()) {
if (!trade.isInitialized() || trade.isShutDownStarted()) return; if (!trade.isInitialized() || trade.isShutDownStarted()) return;
latchTrade(); latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message); Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); processModel.setTradeMessage(message);
// check minimum trade phase // check minimum trade phase
if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) { if (trade.isBuyer() && trade.getPhase().ordinal() < Trade.Phase.PAYMENT_SENT.ordinal()) {
log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); log.warn("Received PaymentReceivedMessage before payment sent for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
return; return;
}
if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) {
log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
return;
}
if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) {
log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
return;
}
expect(anyPhase()
.with(message)
.from(peer))
.setup(tasks(
ProcessPaymentReceivedMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
log.warn("Error processing payment received message: " + errorMessage);
processModel.getTradeManager().requestPersistence();
// schedule to reprocess message unless deleted
if (trade.getSeller().getPaymentReceivedMessage() != null) {
UserThread.runAfter(() -> {
reprocessPaymentReceivedMessageCount++;
maybeReprocessPaymentReceivedMessage(reprocessOnError);
}, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount));
} else {
handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
}
unlatchTrade();
})))
.executeTasks(true);
awaitTradeLatch();
} }
if (trade.isArbitrator() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_CONFIRMED.ordinal()) { }, trade.getId());
log.warn("Received PaymentReceivedMessage before deposits confirmed for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId()); });
return;
}
if (trade.isSeller() && trade.getPhase().ordinal() < Trade.Phase.DEPOSITS_UNLOCKED.ordinal()) {
log.warn("Received PaymentReceivedMessage before deposits unlocked for {} {}, ignoring", trade.getClass().getSimpleName(), trade.getId());
return;
}
expect(anyPhase()
.with(message)
.from(peer))
.setup(tasks(
ProcessPaymentReceivedMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
log.warn("Error processing payment received message: " + errorMessage);
processModel.getTradeManager().requestPersistence();
// schedule to reprocess message unless deleted
if (trade.getSeller().getPaymentReceivedMessage() != null) {
UserThread.runAfter(() -> {
reprocessPaymentReceivedMessageCount++;
maybeReprocessPaymentReceivedMessage(reprocessOnError);
}, trade.getReprocessDelayInSeconds(reprocessPaymentReceivedMessageCount));
} else {
handleTaskRunnerFault(peer, message, errorMessage); // otherwise send nack
}
unlatchTrade();
})))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
} }
public void onWithdrawCompleted() { public void onWithdrawCompleted() {