resend mailbox messages until acked or nacked, add delay when stored (#2036)

* before remove ack or stored

* only use ack or nack

* use acked or nacked for arbitrator's DisputeOpenedMessage

* resend PaymentReceivedMessage TTL / 3 *+ 1.5 after stored to mailbox

* remove redundant re-sends

* separation natural conclusion

* simplify resend to 15 minutes doubling

* expand implementation to other senders

* rename RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS

* cleanup
This commit is contained in:
woodser 2025-11-21 18:53:46 -04:00 committed by GitHub
parent b525f34e53
commit 8867e9eb3b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 95 additions and 66 deletions

View file

@ -71,7 +71,7 @@ public abstract class SellerTrade extends Trade {
public boolean needsToResendPaymentReceivedMessages() {
boolean hasNoPaymentReceivedMessages = getBuyer().getPaymentReceivedMessage() == null && getArbitrator().getPaymentReceivedMessage() == null;
if (!walletExistsNoSync() && !hasNoPaymentReceivedMessages) return false; // cannot provide any updated state
return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesAckedOrStored() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration();
return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesAckedOrNacked() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration();
}
private boolean resendPaymentReceivedMessagesEnabled() {

View file

@ -1997,14 +1997,14 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
}
// TODO: clear other process data
if (processModel.isPaymentReceivedMessagesAckedOrStored()) setPayoutTxHex(null);
if (processModel.isPaymentReceivedMessagesAckedOrNacked()) setPayoutTxHex(null);
for (TradePeer peer : getAllPeers()) {
peer.setUpdatedMultisigHex(null);
peer.setDisputeClosedMessage(null);
peer.setPaymentSentMessage(null);
peer.setDepositTxHex(null);
peer.setDepositTxKey(null);
if (peer.isPaymentReceivedMessageAckedOrStored()) {
if (peer.isPaymentReceivedMessageAckedOrNacked()) {
peer.setUnsignedPayoutTxHex(null);
peer.setPaymentReceivedMessage(null);
}

View file

@ -82,7 +82,7 @@ public class ArbitratorProtocol extends DisputeProtocol {
if (trade.getBuyer().getDisputeOpenedMessage() == null && trade.getSeller().getDisputeOpenedMessage() == null) return false;
if (trade.getDisputeState() != Trade.DisputeState.DISPUTE_OPENED) return false;
if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return false;
return !trade.getProcessModel().isDisputeOpenedMessageAckedOrStored();
return !trade.getProcessModel().isDisputeOpenedMessageAckedOrNacked();
}
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -309,12 +309,12 @@ public class ProcessModel implements Model, PersistablePayload {
return getP2PService().getAddress();
}
public boolean isPaymentReceivedMessagesAckedOrStored() {
return getArbitrator().isPaymentReceivedMessageAckedOrStored() && getBuyer().isPaymentReceivedMessageAckedOrStored();
public boolean isPaymentReceivedMessagesAckedOrNacked() {
return getArbitrator().isPaymentReceivedMessageAckedOrNacked() && getBuyer().isPaymentReceivedMessageAckedOrNacked();
}
public boolean isDisputeOpenedMessageAckedOrStored() {
return getBuyer().isDisputeOpenedMessageAckedOrStored() || getSeller().isDisputeOpenedMessageAckedOrStored();
public boolean isDisputeOpenedMessageAckedOrNacked() {
return getBuyer().isDisputeOpenedMessageAckedOrNacked() || getSeller().isDisputeOpenedMessageAckedOrNacked();
}
void setDepositTxSentAckMessage(AckMessage ackMessage) {

View file

@ -284,28 +284,36 @@ public final class TradePeer implements PersistablePayload {
return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
}
public boolean isPaymentReceivedMessageReceived() {
return isPaymentReceivedMessageAckedOrStored() || isPaymentReceivedMessageNacked();
public boolean isPaymentSentMessageStored() {
return paymentSentMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
}
public boolean isPaymentReceivedMessageAckedOrStored() {
return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
public boolean isPaymentReceivedMessageAcked() {
return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
}
public boolean isPaymentReceivedMessageNacked() {
return paymentReceivedMessageStateProperty.get() == MessageState.NACKED;
}
public boolean isPaymentReceivedMessageStored() {
return paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
}
public boolean isPaymentReceivedMessageAckedOrNacked() {
return isPaymentReceivedMessageAcked() || isPaymentReceivedMessageNacked();
}
public boolean isPaymentReceivedMessageArrived() {
return paymentReceivedMessageStateProperty.get() == MessageState.ARRIVED;
}
public boolean isDisputeOpenedMessageReceived() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX || disputeOpenedMessageStateProperty.get() == MessageState.NACKED;
public boolean isDisputeOpenedMessageAckedOrNacked() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.NACKED;
}
public boolean isDisputeOpenedMessageAckedOrStored() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
public boolean isDisputeOpenedMessageStored() {
return disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
}
@Override

View file

@ -30,6 +30,7 @@ import haveno.core.support.messages.ChatMessage;
import haveno.core.trade.ArbitratorTrade;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.network.p2p.mailbox.MailboxMessage;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode;
@ -50,7 +51,7 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private long delayInMin = 15;
private int resendCounter = 0;
private DisputeOpenedMessage message = null;
@ -64,7 +65,7 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess
runInterceptHook();
// reset nack state
if (getReceiver().isDisputeOpenedMessageReceived()) {
if (getReceiver().isDisputeOpenedMessageAckedOrNacked()) {
getReceiver().setDisputeOpenedMessageState(MessageState.UNDEFINED);
}
@ -135,50 +136,51 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess
private void tryToSendAgainLater() {
// skip if already acked
// skip if stopped
if (stopSending()) return;
// stop after max attempts
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the DisputeOpenedMessage to the peer. We stop trying to send the message.");
return;
}
// reset timer
if (timer != null) {
timer.stop();
}
// increase minimum delay if message is stored to mailbox
if (getReceiver().isDisputeOpenedMessageStored()) {
delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS);
}
// send again after delay
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
// register listeners once
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getDisputeOpenedMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getDisputeOpenedMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
// increase delay up to message TTL
delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2);
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isMessageReceived()) {
if (stopSending()) {
cleanup();
}
}
protected boolean isMessageReceived() {
return getReceiver().isDisputeOpenedMessageReceived();
}
protected boolean stopSending() {
if (getReceiver().getDisputeOpenedMessage() == null) return true; // stop if no message to send
if (isMessageReceived()) return true; // stop if message received
@ -187,4 +189,8 @@ public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMess
if (message != null && !message.equals(getReceiver().getDisputeOpenedMessage())) return true; // stop if message state is outdated
return false;
}
protected boolean isMessageReceived() {
return getReceiver().isDisputeOpenedMessageAckedOrNacked();
}
}

View file

@ -64,7 +64,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private long delayInMin = 15;
private int resendCounter = 0;
public BuyerSendPaymentSentMessage(TaskRunner<Trade> taskHandler, Trade trade) {
@ -77,7 +77,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
runInterceptHook();
// skip if already acked by receiver
if (isAckedByReceiver()) {
if (stopSending()) {
if (!isCompleted()) complete();
return;
}
@ -161,47 +161,52 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
private void tryToSendAgainLater() {
// skip if already acked
if (isAckedByReceiver()) return;
// skip if stopped
if (stopSending()) return;
// stop after max attempts
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the PaymentSentMessage to the peer. We stop trying to send the message.");
return;
}
// reset timer
if (timer != null) {
timer.stop();
}
// increase minimum delay if message is stored to mailbox
if (getReceiver().isPaymentSentMessageStored()) {
delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS);
}
// send again after delay
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
// register listeners once
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getPaymentSentMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
// increase delay up to message TTL
delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2);
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isAckedByReceiver()) {
if (stopSending()) {
cleanup();
}
}
protected boolean isAckedByReceiver() {
protected boolean stopSending() {
return getReceiver().isPaymentSentMessageAcked();
}
}

View file

@ -66,7 +66,7 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private long delayInMin = 15;
private int resendCounter = 0;
private String unsignedPayoutTxHex = null;
private String signedPayoutTxHex = null;
@ -205,47 +205,48 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
// skip if stopped
if (stopSending()) return;
// stop after max attempts
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message.");
return;
}
// reset timer
if (timer != null) {
timer.stop();
}
// increase minimum delay if message is stored to mailbox
if (getReceiver().isPaymentReceivedMessageStored()) {
delayInMin = Math.max(delayInMin, SendMailboxMessageTask.RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS);
}
// send again after delay
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
// register listeners once
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
// increase delay up to message TTL
delayInMin = Math.min(TradeMailboxMessage.TTL, delayInMin * 2);
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isMessageReceived()) {
if (stopSending()) {
cleanup();
}
}
protected boolean isMessageReceived() {
return getReceiver().isPaymentReceivedMessageReceived();
}
protected boolean stopSending() {
if (isMessageReceived()) return true; // stop if message received
if (!trade.isPaymentReceived()) return true; // stop if trade state reset
@ -258,4 +259,8 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
if (updatedMultisigHex != null && !StringUtils.equals(updatedMultisigHex, trade.getSelf().getUpdatedMultisigHex())) return true;
return false;
}
protected boolean isMessageReceived() {
return getReceiver().isPaymentReceivedMessageAckedOrNacked();
}
}

View file

@ -17,6 +17,8 @@
package haveno.core.trade.protocol.tasks;
import java.util.concurrent.TimeUnit;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
@ -28,6 +30,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class SendMailboxMessageTask extends TradeTask {
public static final long RESEND_STORED_MESSAGE_INITIAL_DELAY_MINS = TimeUnit.HOURS.toMinutes(6);
public SendMailboxMessageTask(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}