refactor message resending, reprocessing, and ack handling

This commit is contained in:
woodser 2025-03-14 08:55:47 -04:00 committed by woodser
parent b7c9dea518
commit cb25a23779
17 changed files with 419 additions and 243 deletions

View file

@ -46,7 +46,6 @@ import haveno.common.taskrunner.Model;
import haveno.common.util.Utilities; import haveno.common.util.Utilities;
import haveno.core.monetary.Price; import haveno.core.monetary.Price;
import haveno.core.monetary.Volume; import haveno.core.monetary.Volume;
import haveno.core.network.MessageState;
import haveno.core.offer.Offer; import haveno.core.offer.Offer;
import haveno.core.offer.OfferDirection; import haveno.core.offer.OfferDirection;
import haveno.core.offer.OpenOffer; import haveno.core.offer.OpenOffer;
@ -195,7 +194,8 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), SELLER_SENT_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED), SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED); SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED),
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG(Phase.PAYMENT_RECEIVED);
@NotNull @NotNull
public Phase getPhase() { public Phase getPhase() {
@ -618,8 +618,9 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
public void initialize(ProcessModelServiceProvider serviceProvider) { public void initialize(ProcessModelServiceProvider serviceProvider) {
if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized"); if (isInitialized) throw new IllegalStateException(getClass().getSimpleName() + " " + getId() + " is already initialized");
// done if payout unlocked and marked complete // skip initialization if trade is complete
if (isPayoutUnlocked() && isCompleted()) { // starting in v1.0.19, seller resends payment received message until acked or stored in mailbox
if (isPayoutUnlocked() && isCompleted() && !getProtocol().needsToResendPaymentReceivedMessages()) {
clearAndShutDown(); clearAndShutDown();
return; return;
} }
@ -733,15 +734,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
xmrWalletService.addWalletListener(idlePayoutSyncer); xmrWalletService.addWalletListener(idlePayoutSyncer);
} }
// TODO: buyer's payment sent message state property became unsynced if shut down while awaiting ack from seller. fixed in v1.0.19 so this check can be removed?
if (isBuyer()) {
MessageState expectedState = getPaymentSentMessageState();
if (expectedState != null && expectedState != processModel.getPaymentSentMessageStatePropertySeller().get()) {
log.warn("Updating unexpected payment sent message state for {} {}, expected={}, actual={}", getClass().getSimpleName(), getId(), expectedState, processModel.getPaymentSentMessageStatePropertySeller().get());
processModel.getPaymentSentMessageStatePropertySeller().set(expectedState);
}
}
// handle confirmations // handle confirmations
walletHeight.addListener((observable, oldValue, newValue) -> { walletHeight.addListener((observable, oldValue, newValue) -> {
importMultisigHexIfScheduled(); importMultisigHexIfScheduled();
@ -771,11 +763,20 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
} }
} }
// start polling if deposit requested // init syncing if deposit requested
if (isDepositRequested()) tryInitPolling(); if (isDepositRequested()) {
tryInitSyncing();
}
isFullyInitialized = true; isFullyInitialized = true;
} }
public void reprocessApplicableMessages() {
if (!isDepositRequested() || isPayoutUnlocked() || isCompleted()) return;
getProtocol().maybeReprocessPaymentSentMessage(false);
getProtocol().maybeReprocessPaymentReceivedMessage(false);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
}
public void awaitInitialized() { public void awaitInitialized() {
while (!isFullyInitialized) HavenoUtils.waitFor(100); // TODO: use proper notification and refactor isInitialized, fullyInitialized, and arbitrator idling while (!isFullyInitialized) HavenoUtils.waitFor(100); // TODO: use proper notification and refactor isInitialized, fullyInitialized, and arbitrator idling
} }
@ -1535,7 +1536,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
peer.setUpdatedMultisigHex(null); peer.setUpdatedMultisigHex(null);
peer.setDisputeClosedMessage(null); peer.setDisputeClosedMessage(null);
peer.setPaymentSentMessage(null); peer.setPaymentSentMessage(null);
peer.setPaymentReceivedMessage(null); if (peer.isPaymentReceivedMessageReceived()) peer.setPaymentReceivedMessage(null);
} }
} }
@ -2049,25 +2050,6 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
throw new IllegalArgumentException("Trade is not buyer, seller, or arbitrator"); throw new IllegalArgumentException("Trade is not buyer, seller, or arbitrator");
} }
public MessageState getPaymentSentMessageState() {
if (isPaymentReceived()) return MessageState.ACKNOWLEDGED;
if (processModel.getPaymentSentMessageStatePropertySeller().get() == MessageState.ACKNOWLEDGED) return MessageState.ACKNOWLEDGED;
switch (state) {
case BUYER_SENT_PAYMENT_SENT_MSG:
return MessageState.SENT;
case BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG:
return MessageState.ARRIVED;
case BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG:
return MessageState.STORED_IN_MAILBOX;
case SELLER_RECEIVED_PAYMENT_SENT_MSG:
return MessageState.ACKNOWLEDGED;
case BUYER_SEND_FAILED_PAYMENT_SENT_MSG:
return MessageState.FAILED;
default:
return null;
}
}
public String getPeerRole(TradePeer peer) { public String getPeerRole(TradePeer peer) {
if (peer == getBuyer()) return "Buyer"; if (peer == getBuyer()) return "Buyer";
if (peer == getSeller()) return "Seller"; if (peer == getSeller()) return "Seller";
@ -2444,11 +2426,12 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
// sync and reprocess messages on new thread // sync and reprocess messages on new thread
if (isInitialized && connection != null && !Boolean.FALSE.equals(xmrConnectionService.isConnected())) { if (isInitialized && connection != null && !Boolean.FALSE.equals(xmrConnectionService.isConnected())) {
ThreadUtils.execute(() -> tryInitPolling(), getId()); ThreadUtils.execute(() -> tryInitSyncing(), getId());
} }
} }
} }
private void tryInitPolling() {
private void tryInitSyncing() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
// set known deposit txs // set known deposit txs
@ -2457,24 +2440,18 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
// start polling // start polling
if (!isIdling()) { if (!isIdling()) {
tryInitPollingAux(); doTryInitSyncing();
} else { } else {
long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling long startSyncingInMs = ThreadLocalRandom.current().nextLong(0, getPollPeriod()); // random time to start polling
UserThread.runAfter(() -> ThreadUtils.execute(() -> { UserThread.runAfter(() -> ThreadUtils.execute(() -> {
if (!isShutDownStarted) tryInitPollingAux(); if (!isShutDownStarted) doTryInitSyncing();
}, getId()), startSyncingInMs / 1000l); }, getId()), startSyncingInMs / 1000l);
} }
} }
private void tryInitPollingAux() { private void doTryInitSyncing() {
if (!wasWalletSynced) trySyncWallet(true); if (!wasWalletSynced) trySyncWallet(true);
updatePollPeriod(); updatePollPeriod();
// reprocess pending messages
getProtocol().maybeReprocessPaymentSentMessage(false);
getProtocol().maybeReprocessPaymentReceivedMessage(false);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(this, false);
startPolling(); startPolling();
} }
@ -2825,7 +2802,7 @@ public abstract class Trade extends XmrWalletBase implements Tradable, Model {
if (!isShutDownStarted) wallet = getWallet(); if (!isShutDownStarted) wallet = getWallet();
restartInProgress = false; restartInProgress = false;
pollWallet(); pollWallet();
if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitPolling(), getId()); if (!isShutDownStarted) ThreadUtils.execute(() -> tryInitSyncing(), getId());
} }
private void setStateDepositsSeen() { private void setStateDepositsSeen() {

View file

@ -44,6 +44,7 @@ import haveno.core.account.witness.AccountAgeWitnessService;
import haveno.core.filter.FilterManager; import haveno.core.filter.FilterManager;
import haveno.core.network.MessageState; import haveno.core.network.MessageState;
import haveno.core.offer.Offer; import haveno.core.offer.Offer;
import haveno.core.offer.OfferDirection;
import haveno.core.offer.OpenOfferManager; import haveno.core.offer.OpenOfferManager;
import haveno.core.payment.PaymentAccount; import haveno.core.payment.PaymentAccount;
import haveno.core.payment.payload.PaymentAccountPayload; import haveno.core.payment.payload.PaymentAccountPayload;
@ -73,6 +74,9 @@ import org.bitcoinj.core.Coin;
import org.bitcoinj.core.Transaction; import org.bitcoinj.core.Transaction;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional; import java.util.Optional;
// Fields marked as transient are only used during protocol execution which are based on directMessages so we do not // Fields marked as transient are only used during protocol execution which are based on directMessages so we do not
@ -161,15 +165,11 @@ public class ProcessModel implements Model, PersistablePayload {
@Getter @Getter
@Setter @Setter
private boolean importMultisigHexScheduled; private boolean importMultisigHexScheduled;
// We want to indicate the user the state of the message delivery of the
// PaymentSentMessage. As well we do an automatic re-send in case it was not ACKed yet.
// To enable that even after restart we persist the state.
@Setter
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false); private ObjectProperty<Boolean> paymentAccountDecryptedProperty = new SimpleObjectProperty<>(false);
@Deprecated
private ObjectProperty<MessageState> paymentSentMessageStatePropertySeller = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Deprecated
private ObjectProperty<MessageState> paymentSentMessageStatePropertyArbitrator = new SimpleObjectProperty<>(MessageState.UNDEFINED);
public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) { public ProcessModel(String offerId, String accountId, PubKeyRing pubKeyRing) {
this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer()); this(offerId, accountId, pubKeyRing, new TradePeer(), new TradePeer(), new TradePeer());
@ -191,6 +191,31 @@ public class ProcessModel implements Model, PersistablePayload {
this.offer = offer; this.offer = offer;
this.provider = provider; this.provider = provider;
this.tradeManager = tradeManager; this.tradeManager = tradeManager;
for (TradePeer peer : getTradePeers()) {
peer.applyTransient(tradeManager);
}
// migrate deprecated fields to new model for v1.0.19
if (paymentSentMessageStatePropertySeller.get() != MessageState.UNDEFINED && getSeller().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
getSeller().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertySeller.get());
tradeManager.requestPersistence();
}
if (paymentSentMessageStatePropertyArbitrator.get() != MessageState.UNDEFINED && getArbitrator().getPaymentSentMessageStateProperty().get() == MessageState.UNDEFINED) {
getArbitrator().getPaymentSentMessageStateProperty().set(paymentSentMessageStatePropertyArbitrator.get());
tradeManager.requestPersistence();
}
}
private List<TradePeer> getTradePeers() {
return Arrays.asList(maker, taker, arbitrator);
}
private TradePeer getBuyer() {
return offer.getDirection() == OfferDirection.BUY ? maker : taker;
}
private TradePeer getSeller() {
return offer.getDirection() == OfferDirection.BUY ? taker : maker;
} }
@ -245,14 +270,13 @@ public class ProcessModel implements Model, PersistablePayload {
processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress())); processModel.setTradeFeeAddress(ProtoUtil.stringOrNullFromProto(proto.getTradeFeeAddress()));
processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress())); processModel.setMultisigAddress(ProtoUtil.stringOrNullFromProto(proto.getMultisigAddress()));
// deprecated fields need to be read in order to migrate to new fields
String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller()); String paymentSentMessageStateSellerString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateSeller());
MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString); MessageState paymentSentMessageStateSeller = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateSellerString);
processModel.setPaymentSentMessageStateSeller(paymentSentMessageStateSeller); processModel.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateSeller);
String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator()); String paymentSentMessageStateArbitratorString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageStateArbitrator());
MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString); MessageState paymentSentMessageStateArbitrator = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateArbitratorString);
processModel.setPaymentSentMessageStateArbitrator(paymentSentMessageStateArbitrator); processModel.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateArbitrator);
return processModel; return processModel;
} }
@ -279,40 +303,8 @@ public class ProcessModel implements Model, PersistablePayload {
return getP2PService().getAddress(); return getP2PService().getAddress();
} }
void setPaymentSentAckMessageSeller(AckMessage ackMessage) { public boolean isPaymentReceivedMessagesReceived() {
MessageState messageState = ackMessage.isSuccess() ? return getArbitrator().isPaymentReceivedMessageReceived() && getBuyer().isPaymentReceivedMessageReceived();
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setPaymentSentMessageStateSeller(messageState);
}
void setPaymentSentAckMessageArbitrator(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setPaymentSentMessageStateArbitrator(messageState);
}
public void setPaymentSentMessageStateSeller(MessageState paymentSentMessageStateProperty) {
this.paymentSentMessageStatePropertySeller.set(paymentSentMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public void setPaymentSentMessageStateArbitrator(MessageState paymentSentMessageStateProperty) {
this.paymentSentMessageStatePropertyArbitrator.set(paymentSentMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public boolean isPaymentSentMessageAckedBySeller() {
return paymentSentMessageStatePropertySeller.get() == MessageState.ACKNOWLEDGED;
}
public boolean isPaymentSentMessageAckedByArbitrator() {
return paymentSentMessageStatePropertyArbitrator.get() == MessageState.ACKNOWLEDGED;
} }
void setDepositTxSentAckMessage(AckMessage ackMessage) { void setDepositTxSentAckMessage(AckMessage ackMessage) {

View file

@ -53,6 +53,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class SellerProtocol extends DisputeProtocol { public class SellerProtocol extends DisputeProtocol {
private static final long RESEND_PAYMENT_RECEIVED_MSGS_AFTER = 1741629525730L; // Mar 10 2025 17:58 UTC
enum SellerEvent implements FluentProtocol.Event { enum SellerEvent implements FluentProtocol.Event {
STARTUP, STARTUP,
DEPOSIT_TXS_CONFIRMED, DEPOSIT_TXS_CONFIRMED,
@ -69,10 +72,9 @@ public class SellerProtocol extends DisputeProtocol {
// re-send payment received message if payout not published // re-send payment received message if payout not published
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return; if (!needsToResendPaymentReceivedMessages()) return;
synchronized (trade.getLock()) { synchronized (trade.getLock()) {
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return; if (!needsToResendPaymentReceivedMessages()) return;
if (trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.isPayoutPublished()) {
latchTrade(); latchTrade();
given(anyPhase(Trade.Phase.PAYMENT_RECEIVED) given(anyPhase(Trade.Phase.PAYMENT_RECEIVED)
.with(SellerEvent.STARTUP)) .with(SellerEvent.STARTUP))
@ -90,10 +92,17 @@ public class SellerProtocol extends DisputeProtocol {
.executeTasks(); .executeTasks();
awaitTradeLatch(); awaitTradeLatch();
} }
}
}, trade.getId()); }, trade.getId());
} }
public boolean needsToResendPaymentReceivedMessages() {
return !trade.isShutDownStarted() && trade.getState().ordinal() >= Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG.ordinal() && !trade.getProcessModel().isPaymentReceivedMessagesReceived() && resendPaymentReceivedMessagesEnabled();
}
private boolean resendPaymentReceivedMessagesEnabled() {
return trade.getTakeOfferDate().getTime() > RESEND_PAYMENT_RECEIVED_MSGS_AFTER;
}
@Override @Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) { protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
super.onTradeMessage(message, peer); super.onTradeMessage(message, peer);

View file

@ -24,12 +24,17 @@ import haveno.common.crypto.PubKeyRing;
import haveno.common.proto.ProtoUtil; import haveno.common.proto.ProtoUtil;
import haveno.common.proto.persistable.PersistablePayload; import haveno.common.proto.persistable.PersistablePayload;
import haveno.core.account.witness.AccountAgeWitness; import haveno.core.account.witness.AccountAgeWitness;
import haveno.core.network.MessageState;
import haveno.core.payment.payload.PaymentAccountPayload; import haveno.core.payment.payload.PaymentAccountPayload;
import haveno.core.proto.CoreProtoResolver; import haveno.core.proto.CoreProtoResolver;
import haveno.core.support.dispute.messages.DisputeClosedMessage; import haveno.core.support.dispute.messages.DisputeClosedMessage;
import haveno.core.trade.TradeManager;
import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.PaymentReceivedMessage;
import haveno.core.trade.messages.PaymentSentMessage; import haveno.core.trade.messages.PaymentSentMessage;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.NodeAddress; import haveno.network.p2p.NodeAddress;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -57,6 +62,7 @@ public final class TradePeer implements PersistablePayload {
@Nullable @Nullable
transient private byte[] preparedDepositTx; transient private byte[] preparedDepositTx;
transient private MoneroTxWallet depositTx; transient private MoneroTxWallet depositTx;
transient private TradeManager tradeManager;
// Persistable mutable // Persistable mutable
@Nullable @Nullable
@ -96,7 +102,6 @@ public final class TradePeer implements PersistablePayload {
@Getter @Getter
private DisputeClosedMessage disputeClosedMessage; private DisputeClosedMessage disputeClosedMessage;
// added in v 0.6 // added in v 0.6
@Nullable @Nullable
private byte[] accountAgeWitnessNonce; private byte[] accountAgeWitnessNonce;
@ -142,13 +147,32 @@ public final class TradePeer implements PersistablePayload {
private long payoutAmount; private long payoutAmount;
@Nullable @Nullable
private String updatedMultisigHex; private String updatedMultisigHex;
@Getter @Deprecated
private boolean depositsConfirmedMessageAcked;
// We want to indicate the user the state of the message delivery of the payment
// confirmation messages. We do an automatic re-send in case it was not ACKed yet.
// To enable that even after restart we persist the state.
@Setter @Setter
boolean depositsConfirmedMessageAcked; private ObjectProperty<MessageState> depositsConfirmedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
public TradePeer() { public TradePeer() {
} }
public void applyTransient(TradeManager tradeManager) {
this.tradeManager = tradeManager;
// migrate deprecated fields to new model for v1.0.19
if (depositsConfirmedMessageAcked && depositsConfirmedMessageStateProperty.get() == MessageState.UNDEFINED) {
depositsConfirmedMessageStateProperty.set(MessageState.ACKNOWLEDGED);
tradeManager.requestPersistence();
}
}
public BigInteger getDepositTxFee() { public BigInteger getDepositTxFee() {
return BigInteger.valueOf(depositTxFee); return BigInteger.valueOf(depositTxFee);
} }
@ -181,6 +205,60 @@ public final class TradePeer implements PersistablePayload {
this.payoutAmount = payoutAmount.longValueExact(); this.payoutAmount = payoutAmount.longValueExact();
} }
void setDepositsConfirmedAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setDepositsConfirmedMessageState(messageState);
}
void setPaymentSentAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setPaymentSentMessageState(messageState);
}
void setPaymentReceivedAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.FAILED;
setPaymentReceivedMessageState(messageState);
}
public void setDepositsConfirmedMessageState(MessageState depositsConfirmedMessageStateProperty) {
this.depositsConfirmedMessageStateProperty.set(depositsConfirmedMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public void setPaymentSentMessageState(MessageState paymentSentMessageStateProperty) {
this.paymentSentMessageStateProperty.set(paymentSentMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public void setPaymentReceivedMessageState(MessageState paymentReceivedMessageStateProperty) {
this.paymentReceivedMessageStateProperty.set(paymentReceivedMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public boolean isDepositsConfirmedMessageAcked() {
return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
}
public boolean isPaymentSentMessageAcked() {
return paymentSentMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
}
public boolean isPaymentReceivedMessageReceived() {
return paymentReceivedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || paymentReceivedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
}
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder(); final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder();
@ -221,6 +299,9 @@ public final class TradePeer implements PersistablePayload {
Optional.ofNullable(payoutTxFee).ifPresent(e -> builder.setPayoutTxFee(payoutTxFee)); Optional.ofNullable(payoutTxFee).ifPresent(e -> builder.setPayoutTxFee(payoutTxFee));
Optional.ofNullable(payoutAmount).ifPresent(e -> builder.setPayoutAmount(payoutAmount)); Optional.ofNullable(payoutAmount).ifPresent(e -> builder.setPayoutAmount(payoutAmount));
builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked); builder.setDepositsConfirmedMessageAcked(depositsConfirmedMessageAcked);
builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name());
builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name());
builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name());
builder.setCurrentDate(currentDate); builder.setCurrentDate(currentDate);
return builder.build(); return builder.build();
@ -270,6 +351,19 @@ public final class TradePeer implements PersistablePayload {
tradePeer.setUnsignedPayoutTxHex(ProtoUtil.stringOrNullFromProto(proto.getUnsignedPayoutTxHex())); tradePeer.setUnsignedPayoutTxHex(ProtoUtil.stringOrNullFromProto(proto.getUnsignedPayoutTxHex()));
tradePeer.setPayoutTxFee(BigInteger.valueOf(proto.getPayoutTxFee())); tradePeer.setPayoutTxFee(BigInteger.valueOf(proto.getPayoutTxFee()));
tradePeer.setPayoutAmount(BigInteger.valueOf(proto.getPayoutAmount())); tradePeer.setPayoutAmount(BigInteger.valueOf(proto.getPayoutAmount()));
String depositsConfirmedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDepositsConfirmedMessageState());
MessageState depositsConfirmedMessageState = ProtoUtil.enumFromProto(MessageState.class, depositsConfirmedMessageStateString);
tradePeer.setDepositsConfirmedMessageState(depositsConfirmedMessageState);
String paymentSentMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentSentMessageState());
MessageState paymentSentMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentSentMessageStateString);
tradePeer.setPaymentSentMessageState(paymentSentMessageState);
String paymentReceivedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getPaymentReceivedMessageState());
MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString);
tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState);
return tradePeer; return tradePeer;
} }
} }

View file

@ -252,6 +252,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService(); MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this); if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages()); handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
// reprocess applicable messages
trade.reprocessApplicableMessages();
} }
// send deposits confirmed message if applicable // send deposits confirmed message if applicable
@ -281,6 +284,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}, trade.getId()); }, trade.getId());
} }
public boolean needsToResendPaymentReceivedMessages() {
return false; // seller protocol overrides
}
public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) { public void maybeReprocessPaymentSentMessage(boolean reprocessOnError) {
if (trade.isShutDownStarted()) return; if (trade.isShutDownStarted()) return;
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
@ -291,7 +298,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return; return;
} }
log.warn("Reprocessing payment sent message for {} {}", trade.getClass().getSimpleName(), trade.getId()); log.warn("Reprocessing PaymentSentMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError); handle(trade.getBuyer().getPaymentSentMessage(), trade.getBuyer().getPaymentSentMessage().getSenderNodeAddress(), reprocessOnError);
} }
}, trade.getId()); }, trade.getId());
@ -307,7 +314,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return; return;
} }
log.warn("Reprocessing payment received message for {} {}", trade.getClass().getSimpleName(), trade.getId()); log.warn("Reprocessing PaymentReceivedMessage for {} {}", trade.getClass().getSimpleName(), trade.getId());
handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError); handle(trade.getSeller().getPaymentReceivedMessage(), trade.getSeller().getPaymentReceivedMessage().getSenderNodeAddress(), reprocessOnError);
} }
}, trade.getId()); }, trade.getId());
@ -710,47 +717,76 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
private void onAckMessage(AckMessage ackMessage, NodeAddress sender) { private void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
// handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time // get trade peer
if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
if (trade.getTradePeer(sender) == trade.getSeller()) {
processModel.setPaymentSentAckMessageSeller(ackMessage);
trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
processModel.getTradeManager().requestPersistence();
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
processModel.setPaymentSentAckMessageArbitrator(ackMessage);
processModel.getTradeManager().requestPersistence();
} else if (!ackMessage.isSuccess()) {
String err = "Received AckMessage with error state for " + ackMessage.getSourceMsgClassName() + " from "+ sender + " with tradeId " + trade.getId() + " and errorMessage=" + ackMessage.getErrorMessage();
log.warn(err);
return; // log error and ignore nack if not seller
}
}
if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
TradePeer peer = trade.getTradePeer(sender); TradePeer peer = trade.getTradePeer(sender);
if (peer == null) { if (peer == null) {
// get the applicable peer based on the sourceUid
if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator(); if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getArbitrator().getNodeAddress()))) peer = trade.getArbitrator();
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker(); else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getMaker().getNodeAddress()))) peer = trade.getMaker();
else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker(); else if (ackMessage.getSourceUid().equals(HavenoUtils.getDeterministicId(trade, DepositsConfirmedMessage.class, trade.getTaker().getNodeAddress()))) peer = trade.getTaker();
} }
if (peer == null) log.warn("Received AckMesage for DepositsConfirmedMessage for unknown peer: " + sender); if (peer == null) {
else peer.setDepositsConfirmedMessageAcked(true); if (ackMessage.isSuccess()) log.warn("Received AckMessage from unknown peer for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
else log.warn("Received AckMessage with error state from unknown peer for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
return;
}
// update sender's node address
if (!peer.getNodeAddress().equals(sender)) {
log.info("Updating peer's node address from {} to {} using ACK message to {}", peer.getNodeAddress(), sender, ackMessage.getSourceMsgClassName());
peer.setNodeAddress(sender);
} }
} else {
log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
// set trade state on deposit request nack // set trade state on deposit request nack
if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) { if (ackMessage.getSourceMsgClassName().equals(DepositRequest.class.getSimpleName())) {
if (!ackMessage.isSuccess()) {
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
}
// handle ack for DepositsConfirmedMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(DepositsConfirmedMessage.class.getSimpleName())) {
peer.setDepositsConfirmedAckMessage(ackMessage);
processModel.getTradeManager().requestPersistence();
}
// handle ack for PaymentSentMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(PaymentSentMessage.class.getSimpleName())) {
if (trade.getTradePeer(sender) == trade.getSeller()) {
trade.getSeller().setPaymentSentAckMessage(ackMessage);
if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
else trade.setState(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
processModel.getTradeManager().requestPersistence();
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
trade.getArbitrator().setPaymentSentAckMessage(ackMessage);
processModel.getTradeManager().requestPersistence();
} else {
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
return;
}
}
// handle ack for PaymentReceivedMessage, which automatically re-sends if not ACKed in a certain time
if (ackMessage.getSourceMsgClassName().equals(PaymentReceivedMessage.class.getSimpleName())) {
if (trade.getTradePeer(sender) == trade.getBuyer()) {
trade.getBuyer().setPaymentReceivedAckMessage(ackMessage);
if (ackMessage.isSuccess()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_RECEIVED_PAYMENT_RECEIVED_MSG);
else trade.setState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
processModel.getTradeManager().requestPersistence();
} else if (trade.getTradePeer(sender) == trade.getArbitrator()) {
trade.getArbitrator().setPaymentReceivedAckMessage(ackMessage);
processModel.getTradeManager().requestPersistence();
} else {
log.warn("Received AckMessage from unexpected peer for {}, sender={}, trade={} {}, messageUid={}, success={}, errorMsg={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.isSuccess(), ackMessage.getErrorMessage());
return;
}
}
// generic handling
if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {}, sender={}, trade={} {}, messageUid={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid());
} else {
log.warn("Received AckMessage with error state for {}, sender={}, trade={} {}, messageUid={}, errorMessage={}", ackMessage.getSourceMsgClassName(), sender, trade.getClass().getSimpleName(), trade.getId(), ackMessage.getSourceUid(), ackMessage.getErrorMessage());
handleError(ackMessage.getErrorMessage()); handleError(ackMessage.getErrorMessage());
} }
} }

View file

@ -142,26 +142,26 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
@Override @Override
protected void setStateSent() { protected void setStateSent() {
if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG); getReceiver().setPaymentSentMessageState(MessageState.SENT);
tryToSendAgainLater(); tryToSendAgainLater();
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateArrived() { protected void setStateArrived() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG); getReceiver().setPaymentSentMessageState(MessageState.ARRIVED);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateStoredInMailbox() { protected void setStateStoredInMailbox() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG); getReceiver().setPaymentSentMessageState(MessageState.STORED_IN_MAILBOX);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateFault() { protected void setStateFault() {
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG); getReceiver().setPaymentSentMessageState(MessageState.FAILED);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@ -170,7 +170,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
timer.stop(); timer.stop();
} }
if (listener != null) { if (listener != null) {
processModel.getPaymentSentMessageStatePropertySeller().removeListener(listener); trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener);
} }
} }
@ -185,7 +185,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
return; return;
} }
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
if (timer != null) { if (timer != null) {
timer.stop(); timer.stop();
} }
@ -194,8 +193,8 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
if (resendCounter == 0) { if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
processModel.getPaymentSentMessageStatePropertySeller().addListener(listener); getReceiver().getPaymentSentMessageStateProperty().addListener(listener);
onMessageStateChange(processModel.getPaymentSentMessageStatePropertySeller().get()); onMessageStateChange(getReceiver().getPaymentSentMessageStateProperty().get());
} }
// first re-send is after 2 minutes, then increase the delay exponentially // first re-send is after 2 minutes, then increase the delay exponentially
@ -212,12 +211,12 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
} }
private void onMessageStateChange(MessageState newValue) { private void onMessageStateChange(MessageState newValue) {
if (newValue == MessageState.ACKNOWLEDGED) { if (isAckedByReceiver()) {
trade.setStateIfValidTransitionTo(Trade.State.SELLER_RECEIVED_PAYMENT_SENT_MSG);
processModel.getTradeManager().requestPersistence();
cleanup(); cleanup();
} }
} }
protected abstract boolean isAckedByReceiver(); protected boolean isAckedByReceiver() {
return getReceiver().isPaymentSentMessageAcked();
}
} }

View file

@ -38,26 +38,7 @@ public class BuyerSendPaymentSentMessageToArbitrator extends BuyerSendPaymentSen
@Override @Override
protected void setStateSent() { protected void setStateSent() {
super.setStateSent();
complete(); // don't wait for message to arbitrator complete(); // don't wait for message to arbitrator
} }
@Override
protected void setStateFault() {
// state only updated on seller message
}
@Override
protected void setStateStoredInMailbox() {
// state only updated on seller message
}
@Override
protected void setStateArrived() {
// state only updated on seller message
}
@Override
protected boolean isAckedByReceiver() {
return trade.getProcessModel().isPaymentSentMessageAckedByArbitrator();
}
} }

View file

@ -18,7 +18,6 @@
package haveno.core.trade.protocol.tasks; package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMessage; import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.TradePeer; import haveno.core.trade.protocol.TradePeer;
@ -40,25 +39,25 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
@Override @Override
protected void setStateSent() { protected void setStateSent() {
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.SENT); if (trade.getState().ordinal() < Trade.State.BUYER_SENT_PAYMENT_SENT_MSG.ordinal()) trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_PAYMENT_SENT_MSG);
super.setStateSent(); super.setStateSent();
} }
@Override @Override
protected void setStateArrived() { protected void setStateArrived() {
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.ARRIVED); trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_PAYMENT_SENT_MSG);
super.setStateArrived(); super.setStateArrived();
} }
@Override @Override
protected void setStateStoredInMailbox() { protected void setStateStoredInMailbox() {
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.STORED_IN_MAILBOX); trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_PAYMENT_SENT_MSG);
super.setStateStoredInMailbox(); super.setStateStoredInMailbox();
} }
@Override @Override
protected void setStateFault() { protected void setStateFault() {
trade.getProcessModel().setPaymentSentMessageStateSeller(MessageState.FAILED); trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_PAYMENT_SENT_MSG);
super.setStateFault(); super.setStateFault();
} }
@ -69,9 +68,4 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage); appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
complete(); complete();
} }
@Override
protected boolean isAckedByReceiver() {
return trade.getProcessModel().isPaymentSentMessageAckedBySeller();
}
} }

View file

@ -35,11 +35,15 @@
package haveno.core.trade.protocol.tasks; package haveno.core.trade.protocol.tasks;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.PubKeyRing;
import haveno.common.crypto.Sig; import haveno.common.crypto.Sig;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.account.sign.SignedWitness; import haveno.core.account.sign.SignedWitness;
import haveno.core.account.witness.AccountAgeWitnessService; import haveno.core.account.witness.AccountAgeWitnessService;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils; import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.PaymentReceivedMessage;
@ -47,15 +51,23 @@ import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer; import haveno.core.trade.protocol.TradePeer;
import haveno.core.util.JsonUtil; import haveno.core.util.JsonUtil;
import haveno.network.p2p.NodeAddress; import haveno.network.p2p.NodeAddress;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask { public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessageTask {
SignedWitness signedWitness = null; private SignedWitness signedWitness = null;
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private int resendCounter = 0;
public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) { public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade); super(taskHandler, trade);
@ -77,6 +89,13 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
protected void run() { protected void run() {
try { try {
runInterceptHook(); runInterceptHook();
// skip if already received
if (isReceived()) {
if (!isCompleted()) complete();
return;
}
super.run(); super.run();
} catch (Throwable t) { } catch (Throwable t) {
failed(t); failed(t);
@ -134,29 +153,85 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
@Override @Override
protected void setStateSent() { protected void setStateSent() {
trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); log.info("{} sent: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
getReceiver().setPaymentReceivedMessageState(MessageState.SENT);
tryToSendAgainLater();
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateFault() { protected void setStateFault() {
trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
log.error("{} failed: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); log.error("{} failed: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
getReceiver().setPaymentReceivedMessageState(MessageState.FAILED);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateStoredInMailbox() { protected void setStateStoredInMailbox() {
trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
log.info("{} stored in mailbox: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); log.info("{} stored in mailbox: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
getReceiver().setPaymentReceivedMessageState(MessageState.STORED_IN_MAILBOX);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateArrived() { protected void setStateArrived() {
trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness); log.info("{} arrived: tradeId={} at peer {} SignedWitness {}", getClass().getSimpleName(), trade.getId(), getReceiverNodeAddress(), signedWitness);
getReceiver().setPaymentReceivedMessageState(MessageState.ARRIVED);
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
private void cleanup() {
if (timer != null) {
timer.stop();
}
if (listener != null) {
trade.getBuyer().getPaymentReceivedMessageStateProperty().removeListener(listener);
}
}
private void tryToSendAgainLater() {
// skip if already received
if (isReceived()) return;
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the PaymentReceivedMessage to the peer. We stop trying to send the message.");
return;
}
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getPaymentReceivedMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getPaymentReceivedMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the message again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the message again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isReceived()) {
cleanup();
}
}
protected boolean isReceived() {
return getReceiver().isPaymentReceivedMessageReceived();
}
} }

View file

@ -37,6 +37,30 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe
return trade.getBuyer(); return trade.getBuyer();
} }
@Override
protected void setStateSent() {
trade.advanceState(Trade.State.SELLER_SENT_PAYMENT_RECEIVED_MSG);
super.setStateSent();
}
@Override
protected void setStateFault() {
trade.advanceState(Trade.State.SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG);
super.setStateFault();
}
@Override
protected void setStateStoredInMailbox() {
trade.advanceState(Trade.State.SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG);
super.setStateStoredInMailbox();
}
@Override
protected void setStateArrived() {
trade.advanceState(Trade.State.SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG);
super.setStateArrived();
}
// continue execution on fault so payment received message is sent to arbitrator // continue execution on fault so payment received message is sent to arbitrator
@Override @Override
protected void onFault(String errorMessage, TradeMessage message) { protected void onFault(String errorMessage, TradeMessage message) {

View file

@ -23,6 +23,7 @@ import haveno.common.Timer;
import haveno.common.UserThread; import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils; import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.core.trade.messages.DepositsConfirmedMessage; import haveno.core.trade.messages.DepositsConfirmedMessage;
@ -52,8 +53,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
try { try {
runInterceptHook(); runInterceptHook();
// skip if already acked by receiver // skip if already acked or payout published
if (isAckedByReceiver()) { if (isAckedByReceiver() || trade.isPayoutPublished()) {
if (!isCompleted()) complete(); if (!isCompleted()) complete();
return; return;
} }
@ -64,11 +65,17 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
} }
} }
@Override protected abstract TradePeer getReceiver();
protected abstract NodeAddress getReceiverNodeAddress();
@Override @Override
protected abstract PubKeyRing getReceiverPubKeyRing(); protected NodeAddress getReceiverNodeAddress() {
return getReceiver().getNodeAddress();
}
@Override
protected PubKeyRing getReceiverPubKeyRing() {
return getReceiver().getPubKeyRing();
}
@Override @Override
protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) { protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
@ -97,23 +104,24 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
@Override @Override
protected void setStateSent() { protected void setStateSent() {
getReceiver().setDepositsConfirmedMessageState(MessageState.SENT);
tryToSendAgainLater(); tryToSendAgainLater();
processModel.getTradeManager().requestPersistence(); processModel.getTradeManager().requestPersistence();
} }
@Override @Override
protected void setStateArrived() { protected void setStateArrived() {
// no additional handling getReceiver().setDepositsConfirmedMessageState(MessageState.ARRIVED);
} }
@Override @Override
protected void setStateStoredInMailbox() { protected void setStateStoredInMailbox() {
// no additional handling getReceiver().setDepositsConfirmedMessageState(MessageState.STORED_IN_MAILBOX);
} }
@Override @Override
protected void setStateFault() { protected void setStateFault() {
// no additional handling getReceiver().setDepositsConfirmedMessageState(MessageState.FAILED);
} }
private void cleanup() { private void cleanup() {
@ -151,7 +159,6 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
} }
private boolean isAckedByReceiver() { private boolean isAckedByReceiver() {
TradePeer peer = trade.getTradePeer(getReceiverNodeAddress()); return getReceiver().isDepositsConfirmedMessageAcked();
return peer.isDepositsConfirmedMessageAcked();
} }
} }

View file

@ -17,10 +17,9 @@
package haveno.core.trade.protocol.tasks; package haveno.core.trade.protocol.tasks;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.network.p2p.NodeAddress; import haveno.core.trade.protocol.TradePeer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToArbitrator extends SendDepositsConfir
} }
@Override @Override
public NodeAddress getReceiverNodeAddress() { protected TradePeer getReceiver() {
return trade.getArbitrator().getNodeAddress(); return trade.getArbitrator();
}
@Override
public PubKeyRing getReceiverPubKeyRing() {
return trade.getArbitrator().getPubKeyRing();
} }
} }

View file

@ -17,10 +17,9 @@
package haveno.core.trade.protocol.tasks; package haveno.core.trade.protocol.tasks;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.network.p2p.NodeAddress; import haveno.core.trade.protocol.TradePeer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToBuyer extends SendDepositsConfirmedMe
} }
@Override @Override
public NodeAddress getReceiverNodeAddress() { protected TradePeer getReceiver() {
return trade.getBuyer().getNodeAddress(); return trade.getBuyer();
}
@Override
public PubKeyRing getReceiverPubKeyRing() {
return trade.getBuyer().getPubKeyRing();
} }
} }

View file

@ -17,10 +17,9 @@
package haveno.core.trade.protocol.tasks; package haveno.core.trade.protocol.tasks;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner; import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade; import haveno.core.trade.Trade;
import haveno.network.p2p.NodeAddress; import haveno.core.trade.protocol.TradePeer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -34,12 +33,7 @@ public class SendDepositsConfirmedMessageToSeller extends SendDepositsConfirmedM
} }
@Override @Override
public NodeAddress getReceiverNodeAddress() { protected TradePeer getReceiver() {
return trade.getSeller().getNodeAddress(); return trade.getSeller();
}
@Override
public PubKeyRing getReceiverPubKeyRing() {
return trade.getSeller().getPubKeyRing();
} }
} }

View file

@ -200,7 +200,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> { payoutStateSubscription = EasyBind.subscribe(trade.payoutStateProperty(), state -> {
onPayoutStateChanged(state); onPayoutStateChanged(state);
}); });
messageStateSubscription = EasyBind.subscribe(trade.getProcessModel().getPaymentSentMessageStatePropertySeller(), this::onPaymentSentMessageStateChanged); messageStateSubscription = EasyBind.subscribe(trade.getSeller().getPaymentSentMessageStateProperty(), this::onPaymentSentMessageStateChanged);
} }
} }
} }
@ -425,6 +425,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
case SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG: case SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG:
case SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG: case SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG:
case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG: case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
sellerState.set(trade.isPayoutPublished() ? SellerState.STEP4 : SellerState.STEP3); sellerState.set(trade.isPayoutPublished() ? SellerState.STEP4 : SellerState.STEP3);
break; break;

View file

@ -135,6 +135,7 @@ public class SellerStep3View extends TradeStepView {
statusLabel.setText(Res.get("shared.messageStoredInMailbox")); statusLabel.setText(Res.get("shared.messageStoredInMailbox"));
break; break;
case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG: case SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG:
case BUYER_RECEIVED_PAYMENT_RECEIVED_MSG:
busyAnimation.stop(); busyAnimation.stop();
statusLabel.setText(Res.get("shared.messageArrived")); statusLabel.setText(Res.get("shared.messageArrived"));
break; break;

View file

@ -1465,6 +1465,7 @@ message Trade {
SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24; SELLER_SEND_FAILED_PAYMENT_RECEIVED_MSG = 24;
SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25; SELLER_STORED_IN_MAILBOX_PAYMENT_RECEIVED_MSG = 25;
SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26; SELLER_SAW_ARRIVED_PAYMENT_RECEIVED_MSG = 26;
BUYER_RECEIVED_PAYMENT_RECEIVED_MSG = 27;
} }
enum Phase { enum Phase {
@ -1568,8 +1569,8 @@ message ProcessModel {
bytes payout_tx_signature = 4; bytes payout_tx_signature = 4;
bool use_savings_wallet = 5; bool use_savings_wallet = 5;
int64 funds_needed_for_trade = 6; int64 funds_needed_for_trade = 6;
string payment_sent_message_state_seller = 7; string payment_sent_message_state_seller = 7 [deprecated = true];
string payment_sent_message_state_arbitrator = 8; string payment_sent_message_state_arbitrator = 8 [deprecated = true];
bytes maker_signature = 9; bytes maker_signature = 9;
TradePeer maker = 10; TradePeer maker = 10;
TradePeer taker = 11; TradePeer taker = 11;
@ -1613,7 +1614,7 @@ message TradePeer {
string made_multisig_hex = 31; string made_multisig_hex = 31;
string exchanged_multisig_hex = 32; string exchanged_multisig_hex = 32;
string updated_multisig_hex = 33; string updated_multisig_hex = 33;
bool deposits_confirmed_message_acked = 34; bool deposits_confirmed_message_acked = 34 [deprecated = true];
string deposit_tx_hash = 35; string deposit_tx_hash = 35;
string deposit_tx_hex = 36; string deposit_tx_hex = 36;
string deposit_tx_key = 37; string deposit_tx_key = 37;
@ -1622,6 +1623,9 @@ message TradePeer {
string unsigned_payout_tx_hex = 40; string unsigned_payout_tx_hex = 40;
int64 payout_tx_fee = 41; int64 payout_tx_fee = 41;
int64 payout_amount = 42; int64 payout_amount = 42;
string deposits_confirmed_message_state = 43;
string payment_sent_message_state = 44;
string payment_received_message_state = 45;
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////