resend dispute opened message until received

This commit is contained in:
woodser 2025-10-08 13:37:55 -04:00
parent 677c581a31
commit ee15988c59
No known key found for this signature in database
GPG key ID: 55A10DD48ADEE5EF
7 changed files with 342 additions and 69 deletions

View file

@ -29,6 +29,7 @@ import haveno.core.support.messages.ChatMessage;
import haveno.core.support.messages.SupportMessage;
import haveno.core.trade.Trade;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
@ -197,6 +198,16 @@ public abstract class SupportManager {
synchronized (dispute.getChatMessages()) {
for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
// set ack state
TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress());
if (sender == null) {
log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid());
} else {
sender.setDisputeOpenedAckMessage(ackMessage);
}
// advance trade state
if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_PREPARING || trade.getDisputeState() == Trade.DisputeState.DISPUTE_REQUESTED) { // ack can arrive before saw arrived
if (dispute.isClosed()) dispute.reOpen();
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
@ -220,6 +231,16 @@ public abstract class SupportManager {
for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
if (!trade.isArbitrator() && (trade.getDisputeState().isRequested() || trade.getDisputeState().isCloseRequested())) {
// set ack state
TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress());
if (sender == null) {
log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid());
} else {
sender.setDisputeOpenedAckMessage(ackMessage);
}
// advance trade state
log.warn("DisputeOpenedMessage was nacked. We close the dispute now. tradeId={}, nack sender={}", trade.getId(), ackMessage.getSenderNodeAddress());
dispute.setIsClosed();
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_CLOSED);

View file

@ -68,6 +68,7 @@ import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.Trade.DisputeState;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.ArbitratorProtocol;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.xmr.wallet.Restrictions;
import haveno.core.xmr.wallet.TradeWalletService;
@ -590,11 +591,29 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// use existing dispute or create new
Dispute dispute = reOpen ? storedDisputeOptional.get() : msgDispute;
// get contract
Contract contract = dispute.getContract();
// get sender
TradePeer sender;
PubKeyRing senderPubKeyRing;
if (reOpen) { // re-open can come from either peer
sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator();
senderPubKeyRing = sender.getPubKeyRing();
} else {
senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
sender = trade.getTradePeer(senderPubKeyRing);
}
if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
// // save message for reprocessing
// // TODO: handle reprocessing if needed
// sender.setDisputeOpenedMessage(message);
// process on trade thread
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
String errorMessage = null;
PubKeyRing senderPubKeyRing = null;
try {
// initialize
@ -605,7 +624,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
}
dispute.setSupportType(message.getSupportType());
dispute.setState(Dispute.State.NEW);
Contract contract = dispute.getContract();
// validate dispute
try {
@ -634,17 +652,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
if (trade.getSeller().getPaymentAccountPayload() == null) trade.getSeller().setPaymentAccountPayload(dispute.getSellerPaymentAccountPayload());
}
// get sender
TradePeer sender;
if (reOpen) { // re-open can come from either peer
sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator();
senderPubKeyRing = sender.getPubKeyRing();
} else {
senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
sender = trade.getTradePeer(senderPubKeyRing);
}
if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
// update sender node address
sender.setNodeAddress(message.getSenderNodeAddress());
@ -833,8 +840,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// create dispute opened message with peer dispute
TradePeer peer = trade.getTradePeer(pubKeyRing);
PubKeyRing peersPubKeyRing = peer.getPubKeyRing();
NodeAddress peersNodeAddress = peer.getNodeAddress();
DisputeOpenedMessage peerOpenedDisputeMessage = new DisputeOpenedMessage(dispute,
p2PService.getAddress(),
UUID.randomUUID().toString(),
@ -842,61 +847,11 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
updatedMultisigHex,
trade.getArbitrator().getPaymentSentMessage());
log.info("Send {} to peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
recordPendingMessage(peerOpenedDisputeMessage.getClass().getSimpleName());
mailboxMessageService.sendEncryptedMailboxMessage(peersNodeAddress,
peersPubKeyRing,
peerOpenedDisputeMessage,
new SendMailboxMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived at peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
// save message for resending if needed
peer.setDisputeOpenedMessage(peerOpenedDisputeMessage);
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setArrived(true);
persistNow(null);
}
@Override
public void onStoredInMailbox() {
log.info("{} stored in mailbox for peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setStoredInMailbox(true);
persistNow(null);
}
@Override
public void onFault(String errorMessage) {
log.error("{} failed: Peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}, errorMessage={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid(), errorMessage);
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setSendMessageError(errorMessage);
persistNow(null);
}
}
);
// send dispute opened message
((ArbitratorProtocol) trade.getProtocol()).sendDisputeOpenedMessageIfApplicable();
persistNow(null);
}
@ -1142,6 +1097,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
.findAny();
}
// TODO: throw if more than one dispute found? should not be called then
public Optional<Dispute> findDispute(String tradeId) {
T disputeList = getDisputeList();
if (disputeList == null) {

View file

@ -9,9 +9,12 @@ import haveno.core.trade.messages.DepositResponse;
import haveno.core.trade.messages.InitTradeRequest;
import haveno.core.trade.messages.SignContractResponse;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.FluentProtocol.Condition;
import haveno.core.trade.protocol.tasks.ApplyFilter;
import haveno.core.trade.protocol.tasks.ArbitratorProcessDepositRequest;
import haveno.core.trade.protocol.tasks.ArbitratorProcessReserveTx;
import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToBuyer;
import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToSeller;
import haveno.core.trade.protocol.tasks.ArbitratorSendInitTradeOrMultisigRequests;
import haveno.core.trade.protocol.tasks.ProcessInitTradeRequest;
import haveno.core.trade.protocol.tasks.SendDepositsConfirmedMessageToBuyer;
@ -38,6 +41,42 @@ public class ArbitratorProtocol extends DisputeProtocol {
super.onMailboxMessage(message, peer);
}
@Override
protected void onInitialized() {
super.onInitialized();
// re-send dispute opened message if applicable
sendDisputeOpenedMessageIfApplicable();
// TODO: resend dispute closed message if not acked
}
public void sendDisputeOpenedMessageIfApplicable() {
ThreadUtils.execute(() -> {
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
synchronized (trade.getLock()) {
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_OPENED) {
latchTrade();
given(new Condition(trade))
.setup(tasks(
ArbitratorSendDisputeOpenedMessageToBuyer.class,
ArbitratorSendDisputeOpenedMessageToSeller.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
},
(errorMessage) -> {
log.warn("Error sending DisputeOpenedMessage: " + errorMessage);
unlatchTrade();
})))
.executeTasks();
awaitTradeLatch();
}
}
}, trade.getId());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -255,6 +255,13 @@ public final class TradePeer implements PersistablePayload {
}
}
public void setDisputeOpenedAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.NACKED;
setDisputeOpenedMessageState(messageState);
}
public void setDisputeOpenedMessageState(MessageState disputeOpenedMessageStateProperty) {
this.disputeOpenedMessageStateProperty.set(disputeOpenedMessageStateProperty);
if (tradeManager != null) {
@ -293,6 +300,10 @@ public final class TradePeer implements PersistablePayload {
return paymentReceivedMessageStateProperty.get() == MessageState.ARRIVED;
}
public boolean isDisputeOpenedMessageReceived() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX || disputeOpenedMessageStateProperty.get() == MessageState.NACKED;
}
@Override
public Message toProtoMessage() {
final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder();

View file

@ -0,0 +1,170 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.support.dispute.Dispute;
import haveno.core.support.messages.ChatMessage;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.network.p2p.mailbox.MailboxMessage;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
* Arbitrator sends the DisputeOpenedMessage.
* We wait to receive a ACK message back and resend the message
* in case that does not happen in 10 minutes or if the message was stored in mailbox or failed. We keep repeating that
* with doubling the interval each time and until the MAX_RESEND_ATTEMPTS is reached.
* If never successful we give up and complete. It might be a valid case that the peer was not online for an extended
* time but we can be very sure that our message was stored as mailbox message in the network and one the peer goes
* online he will process it.
*/
@Slf4j
@EqualsAndHashCode(callSuper = true)
public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMessageTask {
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private int resendCounter = 0;
public ArbitratorSendDisputeOpenedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected void run() {
try {
runInterceptHook();
// skip if not applicable or already acked
if (getReceiver().getDisputeOpenedMessage() == null || isAckedByReceiver()) {
if (!isCompleted()) complete();
return;
}
super.run();
} catch (Throwable t) {
failed(t);
}
}
protected Optional<Dispute> getDispute() {
return HavenoUtils.arbitrationManager.findDispute(getReceiver().getDisputeOpenedMessage().getDispute());
}
protected ChatMessage getSystemChatMessage() {
return getDispute().get().getChatMessages().get(0);
}
@Override
protected MailboxMessage getMailboxMessage(String tradeId) {
return getReceiver().getDisputeOpenedMessage();
}
@Override
protected void setStateSent() {
getReceiver().setDisputeOpenedMessageState(MessageState.SENT);
tryToSendAgainLater();
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateArrived() {
getReceiver().setDisputeOpenedMessageState(MessageState.ARRIVED);
getSystemChatMessage().setArrived(true);
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateStoredInMailbox() {
getReceiver().setDisputeOpenedMessageState(MessageState.STORED_IN_MAILBOX);
getSystemChatMessage().setStoredInMailbox(true);
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateFault() {
getReceiver().setDisputeOpenedMessageState(MessageState.FAILED);
getSystemChatMessage().setSendMessageError(errorMessage);
processModel.getTradeManager().requestPersistence();
}
private void cleanup() {
if (timer != null) {
timer.stop();
}
if (listener != null) {
getReceiver().getDisputeOpenedMessageStateProperty().removeListener(listener);
}
}
private void tryToSendAgainLater() {
// skip if already acked
if (isAckedByReceiver()) return;
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the DisputeOpenedMessage to the peer. We stop trying to send the message.");
return;
}
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getDisputeOpenedMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getDisputeOpenedMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isAckedByReceiver()) {
cleanup();
}
}
protected boolean isAckedByReceiver() {
return getReceiver().isDisputeOpenedMessageReceived();
}
}

View file

@ -0,0 +1,38 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.protocol.TradePeer;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class ArbitratorSendDisputeOpenedMessageToBuyer extends ArbitratorSendDisputeOpenedMessage {
public ArbitratorSendDisputeOpenedMessageToBuyer(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected TradePeer getReceiver() {
return trade.getBuyer();
}
}

View file

@ -0,0 +1,38 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.protocol.TradePeer;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class ArbitratorSendDisputeOpenedMessageToSeller extends ArbitratorSendDisputeOpenedMessage {
public ArbitratorSendDisputeOpenedMessageToSeller(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected TradePeer getReceiver() {
return trade.getSeller();
}
}