From 8867e9eb3b63f8f9597965bb0a6c017a148ced0b Mon Sep 17 00:00:00 2001 From: woodser Date: Fri, 21 Nov 2025 18:53:46 -0400 Subject: [PATCH] resend mailbox messages until acked or nacked, add delay when stored (#2036) * before remove ack or stored * only use ack or nack * use acked or nacked for arbitrator's DisputeOpenedMessage * resend PaymentReceivedMessage TTL / 3 *+ 1.5 after stored to mailbox * remove redundant re-sends * separation natural conclusion * simplify resend to 15 minutes doubling * expand implementation to other senders * rename RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS * cleanup --- .../java/haveno/core/trade/SellerTrade.java | 2 +- .../main/java/haveno/core/trade/Trade.java | 4 +- .../trade/protocol/ArbitratorProtocol.java | 2 +- .../core/trade/protocol/ProcessModel.java | 8 ++-- .../haveno/core/trade/protocol/TradePeer.java | 24 +++++++---- .../ArbitratorSendDisputeOpenedMessage.java | 42 +++++++++++-------- .../tasks/BuyerSendPaymentSentMessage.java | 37 +++++++++------- .../SellerSendPaymentReceivedMessage.java | 37 +++++++++------- .../tasks/SendMailboxMessageTask.java | 5 +++ 9 files changed, 95 insertions(+), 66 deletions(-) diff --git a/core/src/main/java/haveno/core/trade/SellerTrade.java b/core/src/main/java/haveno/core/trade/SellerTrade.java index 44f5149a6e..84c47f93dc 100644 --- a/core/src/main/java/haveno/core/trade/SellerTrade.java +++ b/core/src/main/java/haveno/core/trade/SellerTrade.java @@ -71,7 +71,7 @@ public abstract class SellerTrade extends Trade { public boolean needsToResendPaymentReceivedMessages() { boolean hasNoPaymentReceivedMessages = getBuyer().getPaymentReceivedMessage() == null && getArbitrator().getPaymentReceivedMessage() == null; if (!walletExistsNoSync() && !hasNoPaymentReceivedMessages) return false; // cannot provide any updated state - return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesAckedOrStored() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration(); + return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesAckedOrNacked() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration(); } private boolean resendPaymentReceivedMessagesEnabled() { diff --git a/core/src/main/java/haveno/core/trade/Trade.java b/core/src/main/java/haveno/core/trade/Trade.java index 40d2c49e8f..87df5ed4bf 100644 --- a/core/src/main/java/haveno/core/trade/Trade.java +++ b/core/src/main/java/haveno/core/trade/Trade.java @@ -1997,14 +1997,14 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model { } // TODO: clear other process data - if (processModel.isPaymentReceivedMessagesAckedOrStored()) setPayoutTxHex(null); + if (processModel.isPaymentReceivedMessagesAckedOrNacked()) setPayoutTxHex(null); for (TradePeer peer : getAllPeers()) { peer.setUpdatedMultisigHex(null); peer.setDisputeClosedMessage(null); peer.setPaymentSentMessage(null); peer.setDepositTxHex(null); peer.setDepositTxKey(null); - if (peer.isPaymentReceivedMessageAckedOrStored()) { + if (peer.isPaymentReceivedMessageAckedOrNacked()) { peer.setUnsignedPayoutTxHex(null); peer.setPaymentReceivedMessage(null); } diff --git a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java index 8818fcf49c..125b46e9c4 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java @@ -82,7 +82,7 @@ public class ArbitratorProtocol extends DisputeProtocol { if (trade.getBuyer().getDisputeOpenedMessage() == null && trade.getSeller().getDisputeOpenedMessage() == null) return false; if (trade.getDisputeState() != Trade.DisputeState.DISPUTE_OPENED) return false; if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return false; - return !trade.getProcessModel().isDisputeOpenedMessageAckedOrStored(); + return !trade.getProcessModel().isDisputeOpenedMessageAckedOrNacked(); } /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java index 1ca9b3b54e..fee4e12a4d 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java @@ -309,12 +309,12 @@ public class ProcessModel implements Model, PersistablePayload { return getP2PService().getAddress(); } - public boolean isPaymentReceivedMessagesAckedOrStored() { - return getArbitrator().isPaymentReceivedMessageAckedOrStored() && getBuyer().isPaymentReceivedMessageAckedOrStored(); + public boolean isPaymentReceivedMessagesAckedOrNacked() { + return getArbitrator().isPaymentReceivedMessageAckedOrNacked() && getBuyer().isPaymentReceivedMessageAckedOrNacked(); } - public boolean isDisputeOpenedMessageAckedOrStored() { - return getBuyer().isDisputeOpenedMessageAckedOrStored() || getSeller().isDisputeOpenedMessageAckedOrStored(); + public boolean isDisputeOpenedMessageAckedOrNacked() { + return getBuyer().isDisputeOpenedMessageAckedOrNacked() || getSeller().isDisputeOpenedMessageAckedOrNacked(); } void setDepositTxSentAckMessage(AckMessage ackMessage) { diff --git a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java index 3a97b3a0e4..d299163927 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java @@ -284,28 +284,36 @@ public final class TradePeer implements PersistablePayload { return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED; } - public boolean isPaymentReceivedMessageReceived() { - return isPaymentReceivedMessageAckedOrStored() || isPaymentReceivedMessageNacked(); + public boolean isPaymentSentMessageStored() { + return paymentSentMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; } - public boolean isPaymentReceivedMessageAckedOrStored() { - return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; + public boolean isPaymentReceivedMessageAcked() { + return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED; } public boolean isPaymentReceivedMessageNacked() { return paymentReceivedMessageStateProperty.get() == MessageState.NACKED; } + public boolean isPaymentReceivedMessageStored() { + return paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; + } + + public boolean isPaymentReceivedMessageAckedOrNacked() { + return isPaymentReceivedMessageAcked() || isPaymentReceivedMessageNacked(); + } + public boolean isPaymentReceivedMessageArrived() { return paymentReceivedMessageStateProperty.get() == MessageState.ARRIVED; } - public boolean isDisputeOpenedMessageReceived() { - return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX || disputeOpenedMessageStateProperty.get() == MessageState.NACKED; + public boolean isDisputeOpenedMessageAckedOrNacked() { + return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.NACKED; } - public boolean isDisputeOpenedMessageAckedOrStored() { - return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; + public boolean isDisputeOpenedMessageStored() { + return disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; } @Override diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java index 81bf59710b..1451acfb37 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java @@ -30,6 +30,7 @@ import haveno.core.support.messages.ChatMessage; import haveno.core.trade.ArbitratorTrade; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; +import haveno.core.trade.messages.TradeMailboxMessage; import haveno.network.p2p.mailbox.MailboxMessage; import javafx.beans.value.ChangeListener; import lombok.EqualsAndHashCode; @@ -50,7 +51,7 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess private ChangeListener listener; private Timer timer; private static final int MAX_RESEND_ATTEMPTS = 20; - private int delayInMin = 10; + private long delayInMin = 15; private int resendCounter = 0; private DisputeOpenedMessage message = null; @@ -64,7 +65,7 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess runInterceptHook(); // reset nack state - if (getReceiver().isDisputeOpenedMessageReceived()) { + if (getReceiver().isDisputeOpenedMessageAckedOrNacked()) { getReceiver().setDisputeOpenedMessageState(MessageState.UNDEFINED); } @@ -135,50 +136,51 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess private void tryToSendAgainLater() { - // skip if already acked + // skip if stopped if (stopSending()) return; + // stop after max attempts if (resendCounter >= MAX_RESEND_ATTEMPTS) { cleanup(); log.warn("We never received an ACK message when sending the DisputeOpenedMessage to the peer. We stop trying to send the message."); return; } + // reset timer if (timer != null) { timer.stop(); } + // increase minimum delay if message is stored to mailbox + if (getReceiver().isDisputeOpenedMessageStored()) { + delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS); + } + + // send again after delay + log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); + if (timer != null) { + timer.stop(); + } timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + // register listeners once if (resendCounter == 0) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); getReceiver().getDisputeOpenedMessageStateProperty().addListener(listener); onMessageStateChange(getReceiver().getDisputeOpenedMessageStateProperty().get()); } - // first re-send is after 2 minutes, then increase the delay exponentially - if (resendCounter == 0) { - int shortDelay = 2; - log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", shortDelay); - timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); - } else { - log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", delayInMin); - timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); - delayInMin = (int) ((double) delayInMin * 1.5); - } + // increase delay up to message TTL + delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2); resendCounter++; } private void onMessageStateChange(MessageState newValue) { - if (isMessageReceived()) { + if (stopSending()) { cleanup(); } } - protected boolean isMessageReceived() { - return getReceiver().isDisputeOpenedMessageReceived(); - } - protected boolean stopSending() { if (getReceiver().getDisputeOpenedMessage() == null) return true; // stop if no message to send if (isMessageReceived()) return true; // stop if message received @@ -187,4 +189,8 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess if (message != null && !message.equals(getReceiver().getDisputeOpenedMessage())) return true; // stop if message state is outdated return false; } + + protected boolean isMessageReceived() { + return getReceiver().isDisputeOpenedMessageAckedOrNacked(); + } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java index b98f2f03cb..ad3736f44c 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java @@ -64,7 +64,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask private ChangeListener listener; private Timer timer; private static final int MAX_RESEND_ATTEMPTS = 20; - private int delayInMin = 10; + private long delayInMin = 15; private int resendCounter = 0; public BuyerSendPaymentSentMessage(TaskRunner taskHandler, Trade trade) { @@ -77,7 +77,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask runInterceptHook(); // skip if already acked by receiver - if (isAckedByReceiver()) { + if (stopSending()) { if (!isCompleted()) complete(); return; } @@ -161,47 +161,52 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask private void tryToSendAgainLater() { - // skip if already acked - if (isAckedByReceiver()) return; + // skip if stopped + if (stopSending()) return; + // stop after max attempts if (resendCounter >= MAX_RESEND_ATTEMPTS) { cleanup(); log.warn("We never received an ACK message when sending the PaymentSentMessage to the peer. We stop trying to send the message."); return; } + // reset timer if (timer != null) { timer.stop(); } + // increase minimum delay if message is stored to mailbox + if (getReceiver().isPaymentSentMessageStored()) { + delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS); + } + + // send again after delay + log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); + if (timer != null) { + timer.stop(); + } timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + // register listeners once if (resendCounter == 0) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); getReceiver().getPaymentSentMessageStateProperty().addListener(listener); onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get()); } - // first re-send is after 2 minutes, then increase the delay exponentially - if (resendCounter == 0) { - int shortDelay = 2; - log.info("We will send the message again to the peer after a delay of {} min.", shortDelay); - timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); - } else { - log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); - timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); - delayInMin = (int) ((double) delayInMin * 1.5); - } + // increase delay up to message TTL + delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2); resendCounter++; } private void onMessageStateChange(MessageState newValue) { - if (isAckedByReceiver()) { + if (stopSending()) { cleanup(); } } - protected boolean isAckedByReceiver() { + protected boolean stopSending() { return getReceiver().isPaymentSentMessageAcked(); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java index 5a53eb6515..4029643d62 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java @@ -66,7 +66,7 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag private ChangeListener listener; private Timer timer; private static final int MAX_RESEND_ATTEMPTS = 20; - private int delayInMin = 10; + private long delayInMin = 15; private int resendCounter = 0; private String unsignedPayoutTxHex = null; private String signedPayoutTxHex = null; @@ -205,47 +205,48 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag // skip if stopped if (stopSending()) return; + // stop after max attempts if (resendCounter >= MAX_RESEND_ATTEMPTS) { cleanup(); log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message."); return; } + // reset timer if (timer != null) { timer.stop(); } + // increase minimum delay if message is stored to mailbox + if (getReceiver().isPaymentReceivedMessageStored()) { + delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS); + } + + // send again after delay + log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); + if (timer != null) { + timer.stop(); + } timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + // register listeners once if (resendCounter == 0) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener); onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get()); } - // first re-send is after 2 minutes, then increase the delay exponentially - if (resendCounter == 0) { - int shortDelay = 2; - log.info("We will send the message again to the peer after a delay of {} min.", shortDelay); - timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); - } else { - log.info("We will send the message again to the peer after a delay of {} min.", delayInMin); - timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); - delayInMin = (int) ((double) delayInMin * 1.5); - } + // increase delay up to message TTL + delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2); resendCounter++; } private void onMessageStateChange(MessageState newValue) { - if (isMessageReceived()) { + if (stopSending()) { cleanup(); } } - protected boolean isMessageReceived() { - return getReceiver().isPaymentReceivedMessageReceived(); - } - protected boolean stopSending() { if (isMessageReceived()) return true; // stop if message received if (!trade.isPaymentReceived()) return true; // stop if trade state reset @@ -258,4 +259,8 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag if (updatedMultisigHex != null && !StringUtils.equals(updatedMultisigHex, trade.getSelf().getUpdatedMultisigHex())) return true; return false; } + + protected boolean isMessageReceived() { + return getReceiver().isPaymentReceivedMessageAckedOrNacked(); + } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java index 51d3344014..7371e4ea54 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java @@ -17,6 +17,8 @@ package haveno.core.trade.protocol.tasks; +import java.util.concurrent.TimeUnit; + import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; @@ -28,6 +30,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public abstract class SendMailboxMessageTask extends TradeTask { + + public static final long RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS = TimeUnit.HOURS.toMinutes(6); + public SendMailboxMessageTask(TaskRunner taskHandler, Trade trade) { super(taskHandler, trade); }