fixes to resend payment received messages up to 2 months (#1887)

This commit is contained in:
woodser 2025-07-30 07:49:52 -04:00 committed by GitHub
parent 017d8d52ba
commit ef0f841f90
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 102 additions and 45 deletions

View file

@ -2206,6 +2206,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
log.info("Refreshing my open offers");
synchronized (openOffers.getList()) {
int size = openOffers.size();
//we clone our list as openOffers might change during our delayed call

View file

@ -482,13 +482,14 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
// TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated?
MoneroTxWallet feeEstimateTx = null;
try {
log.info("Creating dispute fee estimate tx for {} {}", getClass().getSimpleName(), trade.getShortId());
feeEstimateTx = createDisputePayoutTx(trade, dispute.getContract(), disputeResult, false);
} catch (Exception e) {
log.warn("Could not recreate dispute payout tx to verify fee: {}\n", e.getMessage(), e);
}
if (feeEstimateTx != null) {
HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), arbitratorSignedPayoutTx.getFee());
log.info("Dispute payout tx fee {} is within tolerance");
log.info("Dispute payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), trade.getShortId());
}
} else {
disputeTxSet.setMultisigTxHex(trade.getPayoutTxHex());

View file

@ -19,16 +19,19 @@ package haveno.core.trade;
import haveno.core.offer.Offer;
import haveno.core.trade.protocol.ProcessModel;
import haveno.core.trade.protocol.SellerProtocol;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.NodeAddress;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.math.BigInteger;
import java.util.Date;
@Slf4j
public abstract class SellerTrade extends Trade {
private static final long resendPaymentReceivedMessagesDurationMs = 2L * 30 * 24 * 60 * 60 * 1000; // ~2 months
SellerTrade(Offer offer,
BigInteger tradeAmount,
long tradePrice,
@ -62,7 +65,20 @@ public abstract class SellerTrade extends Trade {
}
public boolean isFinished() {
return super.isFinished() && ((SellerProtocol) getProtocol()).needsToResendPaymentReceivedMessages();
return super.isFinished() && !needsToResendPaymentReceivedMessages();
}
public boolean needsToResendPaymentReceivedMessages() {
return !isShutDownStarted() && getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled() && resendPaymentReceivedMessagesWithinDuration();
}
private boolean resendPaymentReceivedMessagesEnabled() {
return getOffer().getOfferPayload().getProtocolVersion() >= 2;
}
public boolean resendPaymentReceivedMessagesWithinDuration() {
Date startDate = getMaxTradePeriodDate(); // TODO: preferably use the date when the payment receipt was confirmed
return new Date().getTime() <= (startDate.getTime() + resendPaymentReceivedMessagesDurationMs);
}
}

View file

@ -720,7 +720,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
if (newValue == Trade.PayoutState.PAYOUT_UNLOCKED) {
if (!isInitialized) return;
log.info("Payout unlocked for {} {}, deleting multisig wallet", getClass().getSimpleName(), getId());
if (isCompleted()) clearAndShutDown();
if (isInitialized && isFinished()) clearAndShutDown();
else deleteWallet();
}
});
@ -838,7 +838,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
public void setCompleted(boolean completed) {
this.isCompleted = completed;
if (isPayoutUnlocked()) clearAndShutDown();
if (isInitialized && isFinished()) clearAndShutDown();
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -1456,11 +1456,11 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
// verify fee is within tolerance by recreating payout tx
// TODO (monero-project): creating tx will require exchanging updated multisig hex if message needs reprocessed. provide weight with describe_transfer so fee can be estimated?
log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getId());
log.info("Creating fee estimate tx for {} {}", getClass().getSimpleName(), getShortId());
saveWallet(); // save wallet before creating fee estimate tx
MoneroTxWallet feeEstimateTx = createPayoutTx();
HavenoUtils.verifyMinerFee(feeEstimateTx.getFee(), payoutTx.getFee());
log.info("Payout tx fee {} is within tolerance");
log.info("Payout tx fee is within tolerance for {} {}", getClass().getSimpleName(), getShortId());
}
// set signed payout tx hex
@ -1558,6 +1558,11 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
}
public void clearAndShutDown() {
// unregister p2p message listener immediately
removeDecryptedDirectMessageListener();
// clear process data and shut down trade
ThreadUtils.execute(() -> {
clearProcessData();
onShutDownStarted();
@ -1574,18 +1579,25 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
}
// TODO: clear other process data
setPayoutTxHex(null);
if (processModel.isPaymentReceivedMessagesReceived()) setPayoutTxHex(null);
for (TradePeer peer : getAllPeers()) {
peer.setUnsignedPayoutTxHex(null);
peer.setUpdatedMultisigHex(null);
peer.setDisputeClosedMessage(null);
peer.setPaymentSentMessage(null);
peer.setDepositTxHex(null);
peer.setDepositTxKey(null);
if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null);
if (peer.isPaymentReceivedMessageReceived()) {
peer.setUnsignedPayoutTxHex(null);
peer.setPaymentReceivedMessage(null);
}
}
}
private void removeDecryptedDirectMessageListener() {
if (getProcessModel() == null || getProcessModel().getProvider() == null || getProcessModel().getP2PService() == null) return;
getProcessModel().getP2PService().removeDecryptedDirectMessageListener(getProtocol());
}
public void maybeClearSensitiveData() {
String change = "";
if (contract != null && contract.maybeClearSensitiveData()) {
@ -1620,6 +1632,9 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
isShutDownStarted = true;
if (!isPayoutUnlocked()) log.info("Shutting down {} {}", getClass().getSimpleName(), getId());
// unregister p2p message listener
removeDecryptedDirectMessageListener();
// create task to shut down trade
Runnable shutDownTask = () -> {

View file

@ -988,29 +988,26 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
if (trade.isCompleted()) throw new RuntimeException("Trade " + trade.getId() + " was already completed");
closedTradableManager.add(trade);
trade.setCompleted(true);
removeTrade(trade, true);
removeTrade(trade);
xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
requestPersistence();
}
public void unregisterTrade(Trade trade) {
log.warn("Unregistering {} {}", trade.getClass().getSimpleName(), trade.getId());
removeTrade(trade, true);
removeTrade(trade);
removeFailedTrade(trade);
if (!trade.isMaker()) xmrWalletService.swapPayoutAddressEntryToAvailable(trade.getId()); // TODO The address entry should have been removed already. Check and if its the case remove that.
requestPersistence();
}
public void removeTrade(Trade trade, boolean removeDirectMessageListener) {
public void removeTrade(Trade trade) {
log.info("TradeManager.removeTrade() " + trade.getId());
// remove trade
synchronized (tradableList.getList()) {
if (!tradableList.remove(trade)) return;
}
// unregister message listener and persist
if (removeDirectMessageListener) p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
requestPersistence();
}
@ -1077,7 +1074,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// we move the trade to FailedTradesManager
public void onMoveInvalidTradeToFailedTrades(Trade trade) {
failedTradesManager.add(trade);
removeTrade(trade, false);
removeTrade(trade);
}
public void onMoveFailedTradeToPendingTrades(Trade trade) {

View file

@ -68,11 +68,11 @@ public class SellerProtocol extends DisputeProtocol {
protected void onInitialized() {
super.onInitialized();
// re-send payment received message if payout not published
// re-send payment received message if not acked
ThreadUtils.execute(() -> {
if (!needsToResendPaymentReceivedMessages()) return;
if (!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return;
synchronized (trade.getLock()) {
if (!needsToResendPaymentReceivedMessages()) return;
if (!!((SellerTrade) trade).needsToResendPaymentReceivedMessages()) return;
latchTrade();
given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
.with(SellerEvent.STARTUP))
@ -93,13 +93,6 @@ public class SellerProtocol extends DisputeProtocol {
}, trade.getId());
}
public boolean needsToResendPaymentReceivedMessages() {
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
}
private boolean resendPaymentReceivedMessagesEnabled() {
return trade.getOffer().getOfferPayload().getProtocolVersion() >= 2;
}
@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {

View file

@ -860,6 +860,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
return;
}
// clear and shut down trade if completely finished after ack
if (trade.isFinished()) {
log.info("Trade {} {} is finished after PaymentReceivedMessage ACK, shutting it down", trade.getClass().getSimpleName(), trade.getId());
trade.clearAndShutDown();
}
}
// generic handling

View file

@ -45,6 +45,7 @@ import haveno.core.account.sign.SignedWitness;
import haveno.core.account.witness.AccountAgeWitnessService;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.PaymentReceivedMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
@ -236,6 +237,9 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
}
protected boolean stopSending() {
return isMessageReceived() || !trade.isPaymentReceived(); // stop if received or trade state reset // TODO: also stop after some number of blocks?
if (isMessageReceived()) return true; // stop if message received
if (!trade.isPaymentReceived()) return true; // stop if trade state reset
if (trade.isPayoutPublished() && !((SellerTrade) trade).resendPaymentReceivedMessagesWithinDuration()) return true; // stop if payout is published and we are not in the resend period
return false;
}
}

View file

@ -249,7 +249,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
requestDataManager.requestPreliminaryData();
keepAliveManager.start();
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onTorNodeReady);
}
}
@Override
@ -258,17 +260,23 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
hiddenServicePublished.set(true);
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onHiddenServicePublished);
}
}
@Override
public void onSetupFailed(Throwable throwable) {
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(e -> e.onSetupFailed(throwable));
}
}
@Override
public void onRequestCustomBridges() {
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(SetupListener::onRequestCustomBridges);
}
}
// Called from networkReadyBinding
@ -304,7 +312,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onUpdatedDataReceived() {
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(P2PServiceListener::onUpdatedDataReceived);
}
}
@Override
@ -314,7 +324,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onNoPeersAvailable() {
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(P2PServiceListener::onNoPeersAvailable);
}
}
@Override
@ -334,7 +346,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
mailboxMessageService.onBootstrapped();
// Once we have applied the state in the P2P domain we notify our listeners
p2pServiceListeners.forEach(listenerHandler);
synchronized (p2pServiceListeners) {
p2pServiceListeners.forEach(listenerHandler);
}
mailboxMessageService.initAfterBootstrapped();
}
@ -369,12 +383,14 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
try {
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.maybeHandleSupportedCapabilitiesMessage(decryptedMsg.getNetworkEnvelope());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
() -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress -> {
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress));
}
}, () -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
} catch (CryptoException e) {
log.warn("Decryption of a direct message failed. This is not expected as the " +
"direct message was sent to our node.");
@ -503,19 +519,27 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
public void addDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
decryptedDirectMessageListeners.add(listener);
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.add(listener);
}
}
public void removeDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
decryptedDirectMessageListeners.remove(listener);
synchronized (decryptedDirectMessageListeners) {
decryptedDirectMessageListeners.remove(listener);
}
}
public void addP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.add(listener);
synchronized (p2pServiceListeners) {
p2pServiceListeners.add(listener);
}
}
public void removeP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.remove(listener);
synchronized (p2pServiceListeners) {
p2pServiceListeners.remove(listener);
}
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {