This commit is contained in:
woodser 2025-10-09 12:14:34 +00:00 committed by GitHub
commit 6be4b99b5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 603 additions and 287 deletions

View file

@ -29,6 +29,7 @@ import haveno.core.support.messages.ChatMessage;
import haveno.core.support.messages.SupportMessage;
import haveno.core.trade.Trade;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
@ -197,6 +198,16 @@ public abstract class SupportManager {
synchronized (dispute.getChatMessages()) {
for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
// set ack state
TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress());
if (sender == null) {
log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid());
} else {
sender.setDisputeOpenedAckMessage(ackMessage);
}
// advance trade state
if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_PREPARING || trade.getDisputeState() == Trade.DisputeState.DISPUTE_REQUESTED) { // ack can arrive before saw arrived
if (dispute.isClosed()) dispute.reOpen();
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED);
@ -220,6 +231,16 @@ public abstract class SupportManager {
for (ChatMessage chatMessage : dispute.getChatMessages()) {
if (chatMessage.getUid().equals(ackMessage.getSourceUid())) {
if (!trade.isArbitrator() && (trade.getDisputeState().isRequested() || trade.getDisputeState().isCloseRequested())) {
// set ack state
TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress());
if (sender == null) {
log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid());
} else {
sender.setDisputeOpenedAckMessage(ackMessage);
}
// advance trade state
log.warn("DisputeOpenedMessage was nacked. We close the dispute now. tradeId={}, nack sender={}", trade.getId(), ackMessage.getSenderNodeAddress());
dispute.setIsClosed();
trade.advanceDisputeState(Trade.DisputeState.DISPUTE_CLOSED);

View file

@ -68,6 +68,7 @@ import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.Trade.DisputeState;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.ArbitratorProtocol;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.xmr.wallet.Restrictions;
import haveno.core.xmr.wallet.TradeWalletService;
@ -590,11 +591,28 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// use existing dispute or create new
Dispute dispute = reOpen ? storedDisputeOptional.get() : msgDispute;
// get contract
Contract contract = dispute.getContract();
// get sender
TradePeer sender;
PubKeyRing senderPubKeyRing;
if (reOpen) { // re-open can come from either peer
sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator();
senderPubKeyRing = sender.getPubKeyRing();
} else {
senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
sender = trade.getTradePeer(senderPubKeyRing);
}
if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
// TODO: save message for reprocessing (arbitrator must remove this when processed or it'll attempt to be sent to peer)
// sender.setDisputeOpenedMessage(message);
// process on trade thread
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
String errorMessage = null;
PubKeyRing senderPubKeyRing = null;
try {
// initialize
@ -605,7 +623,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
}
dispute.setSupportType(message.getSupportType());
dispute.setState(Dispute.State.NEW);
Contract contract = dispute.getContract();
// validate dispute
try {
@ -634,17 +651,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
if (trade.getSeller().getPaymentAccountPayload() == null) trade.getSeller().setPaymentAccountPayload(dispute.getSellerPaymentAccountPayload());
}
// get sender
TradePeer sender;
if (reOpen) { // re-open can come from either peer
sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator();
senderPubKeyRing = sender.getPubKeyRing();
} else {
senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing();
sender = trade.getTradePeer(senderPubKeyRing);
}
if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller");
// update sender node address
sender.setNodeAddress(message.getSenderNodeAddress());
@ -833,8 +839,6 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
// create dispute opened message with peer dispute
TradePeer peer = trade.getTradePeer(pubKeyRing);
PubKeyRing peersPubKeyRing = peer.getPubKeyRing();
NodeAddress peersNodeAddress = peer.getNodeAddress();
DisputeOpenedMessage peerOpenedDisputeMessage = new DisputeOpenedMessage(dispute,
p2PService.getAddress(),
UUID.randomUUID().toString(),
@ -842,61 +846,11 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
updatedMultisigHex,
trade.getArbitrator().getPaymentSentMessage());
log.info("Send {} to peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
recordPendingMessage(peerOpenedDisputeMessage.getClass().getSimpleName());
mailboxMessageService.sendEncryptedMailboxMessage(peersNodeAddress,
peersPubKeyRing,
peerOpenedDisputeMessage,
new SendMailboxMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived at peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
// save message for resending if needed
peer.setDisputeOpenedMessage(peerOpenedDisputeMessage);
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setArrived(true);
persistNow(null);
}
@Override
public void onStoredInMailbox() {
log.info("{} stored in mailbox for peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid());
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setStoredInMailbox(true);
persistNow(null);
}
@Override
public void onFault(String errorMessage) {
log.error("{} failed: Peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " +
"chatMessage.uid={}, errorMessage={}",
peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress,
peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(),
chatMessage.getUid(), errorMessage);
clearPendingMessage();
// We use the chatMessage wrapped inside the peerOpenedDisputeMessage for
// the state, as that is displayed to the user and we only persist that msg
chatMessage.setSendMessageError(errorMessage);
persistNow(null);
}
}
);
// send dispute opened message
((ArbitratorProtocol) trade.getProtocol()).sendDisputeOpenedMessageIfApplicable();
persistNow(null);
}
@ -1142,6 +1096,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
.findAny();
}
// TODO: throw if more than one dispute found? should not be called then
public Optional<Dispute> findDispute(String tradeId) {
T disputeList = getDisputeList();
if (disputeList == null) {

View file

@ -26,6 +26,7 @@ import haveno.network.p2p.NodeAddress;
import lombok.extern.slf4j.Slf4j;
import java.math.BigInteger;
import java.util.Date;
import java.util.UUID;
import javax.annotation.Nullable;
@ -36,62 +37,69 @@ import javax.annotation.Nullable;
@Slf4j
public class ArbitratorTrade extends Trade {
public ArbitratorTrade(Offer offer,
BigInteger tradeAmount,
long tradePrice,
XmrWalletService xmrWalletService,
ProcessModel processModel,
String uid,
NodeAddress makerNodeAddress,
NodeAddress takerNodeAddress,
NodeAddress arbitratorNodeAddress,
@Nullable String challenge) {
super(offer, tradeAmount, tradePrice, xmrWalletService, processModel, uid, makerNodeAddress, takerNodeAddress, arbitratorNodeAddress, challenge);
}
private static final long resendDisputeOpenedMessageDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // 30 days
@Override
public BigInteger getPayoutAmountBeforeCost() {
throw new RuntimeException("Arbitrator does not have a payout amount");
}
public ArbitratorTrade(Offer offer,
BigInteger tradeAmount,
long tradePrice,
XmrWalletService xmrWalletService,
ProcessModel processModel,
String uid,
NodeAddress makerNodeAddress,
NodeAddress takerNodeAddress,
NodeAddress arbitratorNodeAddress,
@Nullable String challenge) {
super(offer, tradeAmount, tradePrice, xmrWalletService, processModel, uid, makerNodeAddress, takerNodeAddress, arbitratorNodeAddress, challenge);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public BigInteger getPayoutAmountBeforeCost() {
throw new RuntimeException("Arbitrator does not have a payout amount");
}
@Override
public protobuf.Tradable toProtoMessage() {
return protobuf.Tradable.newBuilder()
.setArbitratorTrade(protobuf.ArbitratorTrade.newBuilder()
.setTrade((protobuf.Trade) super.toProtoMessage()))
.build();
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
public static Tradable fromProto(protobuf.ArbitratorTrade arbitratorTradeProto,
XmrWalletService xmrWalletService,
CoreProtoResolver coreProtoResolver) {
protobuf.Trade proto = arbitratorTradeProto.getTrade();
ProcessModel processModel = ProcessModel.fromProto(proto.getProcessModel(), coreProtoResolver);
String uid = ProtoUtil.stringOrNullFromProto(proto.getUid());
if (uid == null) {
uid = UUID.randomUUID().toString();
}
return fromProto(new ArbitratorTrade(
Offer.fromProto(proto.getOffer()),
BigInteger.valueOf(proto.getAmount()),
proto.getPrice(),
xmrWalletService,
processModel,
uid,
proto.getProcessModel().getMaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getMaker().getNodeAddress()) : null,
proto.getProcessModel().getTaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getTaker().getNodeAddress()) : null,
proto.getProcessModel().getArbitrator().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getArbitrator().getNodeAddress()) : null,
ProtoUtil.stringOrNullFromProto(proto.getChallenge())),
proto,
coreProtoResolver);
}
@Override
public protobuf.Tradable toProtoMessage() {
return protobuf.Tradable.newBuilder()
.setArbitratorTrade(protobuf.ArbitratorTrade.newBuilder()
.setTrade((protobuf.Trade) super.toProtoMessage()))
.build();
}
@Override
public boolean confirmPermitted() {
throw new RuntimeException("ArbitratorTrade.confirmPermitted() not implemented"); // TODO (woodser): implement
}
public static Tradable fromProto(protobuf.ArbitratorTrade arbitratorTradeProto,
XmrWalletService xmrWalletService,
CoreProtoResolver coreProtoResolver) {
protobuf.Trade proto = arbitratorTradeProto.getTrade();
ProcessModel processModel = ProcessModel.fromProto(proto.getProcessModel(), coreProtoResolver);
String uid = ProtoUtil.stringOrNullFromProto(proto.getUid());
if (uid == null) {
uid = UUID.randomUUID().toString();
}
return fromProto(new ArbitratorTrade(
Offer.fromProto(proto.getOffer()),
BigInteger.valueOf(proto.getAmount()),
proto.getPrice(),
xmrWalletService,
processModel,
uid,
proto.getProcessModel().getMaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getMaker().getNodeAddress()) : null,
proto.getProcessModel().getTaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getTaker().getNodeAddress()) : null,
proto.getProcessModel().getArbitrator().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getArbitrator().getNodeAddress()) : null,
ProtoUtil.stringOrNullFromProto(proto.getChallenge())),
proto,
coreProtoResolver);
}
@Override
public boolean confirmPermitted() {
throw new RuntimeException("ArbitratorTrade.confirmPermitted() not implemented"); // TODO (woodser): implement
}
public boolean resendDisputeOpenedMessageWithinDuration() {
Date startDate = getMaxTradePeriodDate();
return new Date().getTime() <= (startDate.getTime() + resendDisputeOpenedMessageDurationMs);
}
}

View file

@ -30,7 +30,7 @@ import java.util.Date;
@Slf4j
public abstract class SellerTrade extends Trade {
private static final long resendPaymentReceivedMessagesDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // ~1 month
private static final long resendPaymentReceivedMessagesDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // 30 days
SellerTrade(Offer offer,
BigInteger tradeAmount,

View file

@ -9,9 +9,12 @@ import haveno.core.trade.messages.DepositResponse;
import haveno.core.trade.messages.InitTradeRequest;
import haveno.core.trade.messages.SignContractResponse;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.FluentProtocol.Condition;
import haveno.core.trade.protocol.tasks.ApplyFilter;
import haveno.core.trade.protocol.tasks.ArbitratorProcessDepositRequest;
import haveno.core.trade.protocol.tasks.ArbitratorProcessReserveTx;
import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToBuyer;
import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToSeller;
import haveno.core.trade.protocol.tasks.ArbitratorSendInitTradeOrMultisigRequests;
import haveno.core.trade.protocol.tasks.ProcessInitTradeRequest;
import haveno.core.trade.protocol.tasks.SendDepositsConfirmedMessageToBuyer;
@ -24,105 +27,149 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ArbitratorProtocol extends DisputeProtocol {
public ArbitratorProtocol(ArbitratorTrade trade) {
super(trade);
}
@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
super.onTradeMessage(message, peer);
}
@Override
public void onMailboxMessage(TradeMessage message, NodeAddress peer) {
super.onMailboxMessage(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages
///////////////////////////////////////////////////////////////////////////////////////////
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ApplyFilter.class,
ProcessInitTradeRequest.class,
ArbitratorProcessReserveTx.class,
ArbitratorSendInitTradeOrMultisigRequests.class)
.using(new TradeTaskRunner(trade,
() -> {
startTimeout();
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_STEP_TIMEOUT_SECONDS))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
log.warn("Arbitrator ignoring SignContractResponse");
}
public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED)
.with(request)
.from(sender))
.setup(tasks(
ArbitratorProcessDepositRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
if (trade.getState().ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) {
stopTimeout();
this.errorMessageHandler = null;
}
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
log.warn("Arbitrator ignoring DepositResponse for trade " + response.getOfferId());
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends TradeTask>[] getDepositsConfirmedTasks() {
return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class };
}
@Override
public void handleError(String errorMessage) {
// set trade state to send deposit responses with nack
if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) {
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
public ArbitratorProtocol(ArbitratorTrade trade) {
super(trade);
}
@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
super.onTradeMessage(message, peer);
}
@Override
public void onMailboxMessage(TradeMessage message, NodeAddress peer) {
super.onMailboxMessage(message, peer);
}
@Override
protected void onInitialized() {
super.onInitialized();
// re-send dispute opened message if applicable
sendDisputeOpenedMessageIfApplicable();
// TODO: resend dispute closed message if not acked
}
public void sendDisputeOpenedMessageIfApplicable() {
ThreadUtils.execute(() -> {
if (!needsToResendDisputeOpenedMessage()) return;
if (trade.isShutDownStarted() || trade.isPayoutPublished()) return;
synchronized (trade.getLock()) {
if (!needsToResendDisputeOpenedMessage()) return;
latchTrade();
given(new Condition(trade))
.setup(tasks(
ArbitratorSendDisputeOpenedMessageToBuyer.class,
ArbitratorSendDisputeOpenedMessageToSeller.class)
.using(new TradeTaskRunner(trade,
() -> {
unlatchTrade();
},
(errorMessage) -> {
log.warn("Error sending DisputeOpenedMessage: " + errorMessage);
unlatchTrade();
})))
.executeTasks();
awaitTradeLatch();
}
}, trade.getId());
}
private boolean needsToResendDisputeOpenedMessage() {
if (trade.isShutDownStarted()) return false;
if (trade.isPayoutPublished()) return false;
if (trade.getBuyer().getDisputeOpenedMessage() == null && trade.getSeller().getDisputeOpenedMessage() == null) return false;
if (trade.getDisputeState() != Trade.DisputeState.DISPUTE_OPENED) return false;
if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return false;
return !trade.getProcessModel().isDisputeOpenedMessageAckedOrStored();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages
///////////////////////////////////////////////////////////////////////////////////////////
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
this.errorMessageHandler = errorMessageHandler;
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ApplyFilter.class,
ProcessInitTradeRequest.class,
ArbitratorProcessReserveTx.class,
ArbitratorSendInitTradeOrMultisigRequests.class)
.using(new TradeTaskRunner(trade,
() -> {
startTimeout();
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(TRADE_STEP_TIMEOUT_SECONDS))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
log.warn("Arbitrator ignoring SignContractResponse");
}
public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId());
ThreadUtils.execute(() -> {
synchronized (trade.getLock()) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED)
.with(request)
.from(sender))
.setup(tasks(
ArbitratorProcessDepositRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
if (trade.getState().ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) {
stopTimeout();
this.errorMessageHandler = null;
}
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
}, trade.getId());
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
log.warn("Arbitrator ignoring DepositResponse for trade " + response.getOfferId());
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends TradeTask>[] getDepositsConfirmedTasks() {
return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class };
}
@Override
public void handleError(String errorMessage) {
// set trade state to send deposit responses with nack
if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) {
trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED);
}
super.handleError(errorMessage);
}
super.handleError(errorMessage);
}
}

View file

@ -313,6 +313,10 @@ public class ProcessModel implements Model, PersistablePayload {
return getArbitrator().isPaymentReceivedMessageAckedOrStored() && getBuyer().isPaymentReceivedMessageAckedOrStored();
}
public boolean isDisputeOpenedMessageAckedOrStored() {
return getBuyer().isDisputeOpenedMessageAckedOrStored() || getSeller().isDisputeOpenedMessageAckedOrStored();
}
void setDepositTxSentAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :

View file

@ -28,6 +28,7 @@ import haveno.core.network.MessageState;
import haveno.core.payment.payload.PaymentAccountPayload;
import haveno.core.proto.CoreProtoResolver;
import haveno.core.support.dispute.messages.DisputeClosedMessage;
import haveno.core.support.dispute.messages.DisputeOpenedMessage;
import haveno.core.trade.TradeManager;
import haveno.core.trade.messages.PaymentReceivedMessage;
import haveno.core.trade.messages.PaymentSentMessage;
@ -100,6 +101,9 @@ public final class TradePeer implements PersistablePayload {
@Nullable
@Setter
@Getter
private DisputeOpenedMessage disputeOpenedMessage;
@Setter
@Getter
private DisputeClosedMessage disputeClosedMessage;
// added in v 0.6
@ -159,6 +163,10 @@ public final class TradePeer implements PersistablePayload {
private ObjectProperty<MessageState> paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> disputeOpenedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
@Setter
private ObjectProperty<MessageState> disputeClosedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED);
public TradePeer() {
}
@ -247,6 +255,27 @@ public final class TradePeer implements PersistablePayload {
}
}
public void setDisputeOpenedAckMessage(AckMessage ackMessage) {
MessageState messageState = ackMessage.isSuccess() ?
MessageState.ACKNOWLEDGED :
MessageState.NACKED;
setDisputeOpenedMessageState(messageState);
}
public void setDisputeOpenedMessageState(MessageState disputeOpenedMessageStateProperty) {
this.disputeOpenedMessageStateProperty.set(disputeOpenedMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public void setDisputeClosedMessageState(MessageState disputeClosedMessageStateProperty) {
this.disputeClosedMessageStateProperty.set(disputeClosedMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}
public boolean isDepositsConfirmedMessageAcked() {
return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED;
}
@ -271,6 +300,14 @@ public final class TradePeer implements PersistablePayload {
return paymentReceivedMessageStateProperty.get() == MessageState.ARRIVED;
}
public boolean isDisputeOpenedMessageReceived() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX || disputeOpenedMessageStateProperty.get() == MessageState.NACKED;
}
public boolean isDisputeOpenedMessageAckedOrStored() {
return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX;
}
@Override
public Message toProtoMessage() {
final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder();
@ -293,6 +330,7 @@ public final class TradePeer implements PersistablePayload {
Optional.ofNullable(mediatedPayoutTxSignature).ifPresent(e -> builder.setMediatedPayoutTxSignature(ByteString.copyFrom(e)));
Optional.ofNullable(paymentSentMessage).ifPresent(e -> builder.setPaymentSentMessage(paymentSentMessage.toProtoNetworkEnvelope().getPaymentSentMessage()));
Optional.ofNullable(paymentReceivedMessage).ifPresent(e -> builder.setPaymentReceivedMessage(paymentReceivedMessage.toProtoNetworkEnvelope().getPaymentReceivedMessage()));
Optional.ofNullable(disputeOpenedMessage).ifPresent(e -> builder.setDisputeOpenedMessage(disputeOpenedMessage.toProtoNetworkEnvelope().getDisputeOpenedMessage()));
Optional.ofNullable(disputeClosedMessage).ifPresent(e -> builder.setDisputeClosedMessage(disputeClosedMessage.toProtoNetworkEnvelope().getDisputeClosedMessage()));
Optional.ofNullable(reserveTxHash).ifPresent(e -> builder.setReserveTxHash(reserveTxHash));
Optional.ofNullable(reserveTxHex).ifPresent(e -> builder.setReserveTxHex(reserveTxHex));
@ -314,6 +352,8 @@ public final class TradePeer implements PersistablePayload {
builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name());
builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name());
builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name());
builder.setDisputeOpenedMessageState(disputeOpenedMessageStateProperty.get().name());
builder.setDisputeClosedMessageState(disputeClosedMessageStateProperty.get().name());
builder.setCurrentDate(currentDate);
return builder.build();
@ -345,6 +385,7 @@ public final class TradePeer implements PersistablePayload {
tradePeer.setMediatedPayoutTxSignature(ProtoUtil.byteArrayOrNullFromProto(proto.getMediatedPayoutTxSignature()));
tradePeer.setPaymentSentMessage(proto.hasPaymentSentMessage() ? PaymentSentMessage.fromProto(proto.getPaymentSentMessage(), Version.getP2PMessageVersion()) : null);
tradePeer.setPaymentReceivedMessage(proto.hasPaymentReceivedMessage() ? PaymentReceivedMessage.fromProto(proto.getPaymentReceivedMessage(), Version.getP2PMessageVersion()) : null);
tradePeer.setDisputeOpenedMessage(proto.hasDisputeOpenedMessage() ? DisputeOpenedMessage.fromProto(proto.getDisputeOpenedMessage(), coreProtoResolver, Version.getP2PMessageVersion()) : null);
tradePeer.setDisputeClosedMessage(proto.hasDisputeClosedMessage() ? DisputeClosedMessage.fromProto(proto.getDisputeClosedMessage(), Version.getP2PMessageVersion()) : null);
tradePeer.setReserveTxHash(ProtoUtil.stringOrNullFromProto(proto.getReserveTxHash()));
tradePeer.setReserveTxHex(ProtoUtil.stringOrNullFromProto(proto.getReserveTxHex()));
@ -376,6 +417,14 @@ public final class TradePeer implements PersistablePayload {
MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString);
tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState);
String disputeOpenedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDisputeOpenedMessageState());
MessageState disputeOpenedMessageState = ProtoUtil.enumFromProto(MessageState.class, disputeOpenedMessageStateString);
tradePeer.setDisputeOpenedMessageState(disputeOpenedMessageState);
String disputeClosedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDisputeClosedMessageState());
MessageState disputeClosedMessageState = ProtoUtil.enumFromProto(MessageState.class, disputeClosedMessageStateString);
tradePeer.setDisputeClosedMessageState(disputeClosedMessageState);
return tradePeer;
}
}

View file

@ -0,0 +1,190 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.support.dispute.Dispute;
import haveno.core.support.dispute.messages.DisputeOpenedMessage;
import haveno.core.support.messages.ChatMessage;
import haveno.core.trade.ArbitratorTrade;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.network.p2p.mailbox.MailboxMessage;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
* Arbitrator sends the DisputeOpenedMessage.
* We wait to receive a ACK message back and resend the message
* in case that does not happen in 10 minutes or if the message was stored in mailbox or failed. We keep repeating that
* with doubling the interval each time and until the MAX_RESEND_ATTEMPTS is reached.
* If never successful we give up and complete. It might be a valid case that the peer was not online for an extended
* time but we can be very sure that our message was stored as mailbox message in the network and one the peer goes
* online he will process it.
*/
@Slf4j
@EqualsAndHashCode(callSuper = true)
public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMessageTask {
private ChangeListener<MessageState> listener;
private Timer timer;
private static final int MAX_RESEND_ATTEMPTS = 20;
private int delayInMin = 10;
private int resendCounter = 0;
private DisputeOpenedMessage message = null;
public ArbitratorSendDisputeOpenedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected void run() {
try {
runInterceptHook();
// reset nack state
if (getReceiver().isDisputeOpenedMessageReceived()) {
getReceiver().setDisputeOpenedMessageState(MessageState.UNDEFINED);
}
// skip if not applicable or already acked
if (stopSending()) {
if (!isCompleted()) complete();
return;
}
// reset ack state
getReceiver().setPaymentReceivedMessageState(MessageState.UNDEFINED);
super.run();
} catch (Throwable t) {
failed(t);
}
}
protected Optional<Dispute> getDispute() {
return HavenoUtils.arbitrationManager.findDispute(getReceiver().getDisputeOpenedMessage().getDispute());
}
protected ChatMessage getSystemChatMessage() {
return getDispute().get().getChatMessages().get(0);
}
@Override
protected MailboxMessage getMailboxMessage(String tradeId) {
if (message == null) message = getReceiver().getDisputeOpenedMessage();
return message;
}
@Override
protected void setStateSent() {
getReceiver().setDisputeOpenedMessageState(MessageState.SENT);
tryToSendAgainLater();
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateArrived() {
getReceiver().setDisputeOpenedMessageState(MessageState.ARRIVED);
getSystemChatMessage().setArrived(true);
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateStoredInMailbox() {
getReceiver().setDisputeOpenedMessageState(MessageState.STORED_IN_MAILBOX);
getSystemChatMessage().setStoredInMailbox(true);
processModel.getTradeManager().requestPersistence();
}
@Override
protected void setStateFault() {
getReceiver().setDisputeOpenedMessageState(MessageState.FAILED);
getSystemChatMessage().setSendMessageError(errorMessage);
processModel.getTradeManager().requestPersistence();
}
private void cleanup() {
if (timer != null) {
timer.stop();
}
if (listener != null) {
getReceiver().getDisputeOpenedMessageStateProperty().removeListener(listener);
}
}
private void tryToSendAgainLater() {
// skip if already acked
if (stopSending()) return;
if (resendCounter >= MAX_RESEND_ATTEMPTS) {
cleanup();
log.warn("We never received an ACK message when sending the DisputeOpenedMessage to the peer. We stop trying to send the message.");
return;
}
if (timer != null) {
timer.stop();
}
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
if (resendCounter == 0) {
listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
getReceiver().getDisputeOpenedMessageStateProperty().addListener(listener);
onMessageStateChange(getReceiver().getDisputeOpenedMessageStateProperty().get());
}
// first re-send is after 2 minutes, then increase the delay exponentially
if (resendCounter == 0) {
int shortDelay = 2;
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", shortDelay);
timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES);
} else {
log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", delayInMin);
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
delayInMin = (int) ((double) delayInMin * 1.5);
}
resendCounter++;
}
private void onMessageStateChange(MessageState newValue) {
if (isMessageReceived()) {
cleanup();
}
}
protected boolean isMessageReceived() {
return getReceiver().isDisputeOpenedMessageReceived();
}
protected boolean stopSending() {
if (getReceiver().getDisputeOpenedMessage() == null) return true; // stop if no message to send
if (isMessageReceived()) return true; // stop if message received
if (trade.isPayoutPublished()) return true; // stop if payout is published
if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return true; // stop if payout is published and we are not in the resend period
if (message != null && !message.equals(getReceiver().getDisputeOpenedMessage())) return true; // stop if message state is outdated
return false;
}
}

View file

@ -0,0 +1,38 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.protocol.TradePeer;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class ArbitratorSendDisputeOpenedMessageToBuyer extends ArbitratorSendDisputeOpenedMessage {
public ArbitratorSendDisputeOpenedMessageToBuyer(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected TradePeer getReceiver() {
return trade.getBuyer();
}
}

View file

@ -0,0 +1,38 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.protocol.TradePeer;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class ArbitratorSendDisputeOpenedMessageToSeller extends ArbitratorSendDisputeOpenedMessage {
public ArbitratorSendDisputeOpenedMessageToSeller(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
@Override
protected TradePeer getReceiver() {
return trade.getSeller();
}
}

View file

@ -38,16 +38,13 @@ import java.util.concurrent.TimeUnit;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.PaymentSentMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.util.JsonUtil;
import haveno.network.p2p.NodeAddress;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@ -74,18 +71,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
super(taskHandler, trade);
}
protected abstract TradePeer getReceiver();
@Override
protected NodeAddress getReceiverNodeAddress() {
return getReceiver().getNodeAddress();
}
@Override
protected PubKeyRing getReceiverPubKeyRing() {
return getReceiver().getPubKeyRing();
}
@Override
protected void run() {
try {
@ -104,7 +89,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
}
@Override
protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
protected TradeMailboxMessage getMailboxMessage(String tradeId) {
if (getReceiver().getPaymentSentMessage() == null) {
// We do not use a real unique ID here as we want to be able to re-send the exact same message in case the
@ -170,7 +155,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask
timer.stop();
}
if (listener != null) {
trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener);
getReceiver().getPaymentReceivedMessageStateProperty().removeListener(listener);
}
}

View file

@ -19,8 +19,8 @@ package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.network.p2p.mailbox.MailboxMessage;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@ -63,7 +63,7 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes
// continue execution on fault so payment sent message is sent to arbitrator
@Override
protected void onFault(String errorMessage, TradeMessage message) {
protected void onFault(String errorMessage, MailboxMessage message) {
setStateFault();
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
complete();

View file

@ -38,7 +38,6 @@ import com.google.common.base.Charsets;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
import haveno.common.crypto.Sig;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.account.sign.SignedWitness;
@ -49,9 +48,7 @@ import haveno.core.trade.SellerTrade;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.PaymentReceivedMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.util.JsonUtil;
import haveno.network.p2p.NodeAddress;
import javafx.beans.value.ChangeListener;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@ -79,18 +76,6 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
public SellerSendPaymentReceivedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade);
}
protected abstract TradePeer getReceiver();
@Override
protected NodeAddress getReceiverNodeAddress() {
return getReceiver().getNodeAddress();
}
@Override
protected PubKeyRing getReceiverPubKeyRing() {
return getReceiver().getPubKeyRing();
}
@Override
protected void run() {
@ -117,7 +102,7 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag
}
@Override
protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
protected TradeMailboxMessage getMailboxMessage(String tradeId) {
if (getReceiver().getPaymentReceivedMessage() == null) {
// sign account witness

View file

@ -19,8 +19,8 @@ package haveno.core.trade.protocol.tasks;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.network.p2p.mailbox.MailboxMessage;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@ -63,7 +63,7 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe
// continue execution on fault so payment received message is sent to arbitrator
@Override
protected void onFault(String errorMessage, TradeMessage message) {
protected void onFault(String errorMessage, MailboxMessage message) {
setStateFault();
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
complete();

View file

@ -21,15 +21,12 @@ import java.util.concurrent.TimeUnit;
import haveno.common.Timer;
import haveno.common.UserThread;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.network.MessageState;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.DepositsConfirmedMessage;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.network.p2p.NodeAddress;
import lombok.extern.slf4j.Slf4j;
/**
@ -65,20 +62,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas
}
}
protected abstract TradePeer getReceiver();
@Override
protected NodeAddress getReceiverNodeAddress() {
return getReceiver().getNodeAddress();
}
@Override
protected PubKeyRing getReceiverPubKeyRing() {
return getReceiver().getPubKeyRing();
}
@Override
protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) {
protected TradeMailboxMessage getMailboxMessage(String tradeId) {
if (message == null) {
// export multisig hex once

View file

@ -20,10 +20,10 @@ package haveno.core.trade.protocol.tasks;
import haveno.common.crypto.PubKeyRing;
import haveno.common.taskrunner.TaskRunner;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.SendMailboxMessageListener;
import haveno.network.p2p.mailbox.MailboxMessage;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -32,15 +32,17 @@ public abstract class SendMailboxMessageTask extends TradeTask {
super(taskHandler, trade);
}
protected abstract TradePeer getReceiver();
protected NodeAddress getReceiverNodeAddress() {
return trade.getTradePeer().getNodeAddress();
return getReceiver().getNodeAddress();
}
protected PubKeyRing getReceiverPubKeyRing() {
return trade.getTradePeer().getPubKeyRing();
return getReceiver().getPubKeyRing();
}
protected abstract TradeMailboxMessage getTradeMailboxMessage(String tradeId);
protected abstract MailboxMessage getMailboxMessage(String tradeId);
protected abstract void setStateSent();
@ -55,7 +57,7 @@ public abstract class SendMailboxMessageTask extends TradeTask {
try {
runInterceptHook();
String id = processModel.getOfferId();
TradeMailboxMessage message = getTradeMailboxMessage(id);
MailboxMessage message = getMailboxMessage(id);
setStateSent();
NodeAddress peersNodeAddress = getReceiverNodeAddress();
log.info("Send {} to peer {} for {} {}, uid={}",
@ -69,21 +71,21 @@ public abstract class SendMailboxMessageTask extends TradeTask {
new SendMailboxMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived at peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid());
log.info("{} arrived at peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid());
setStateArrived();
if (!task.isCompleted()) complete();
}
@Override
public void onStoredInMailbox() {
log.info("{} stored in mailbox for peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid());
log.info("{} stored in mailbox for peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid());
SendMailboxMessageTask.this.onStoredInMailbox();
}
@Override
public void onFault(String errorMessage) {
if (processModel.getP2PService().isShutDownStarted()) return;
log.error("{} failed: Peer {}. tradeId={}, uid={}, errorMessage={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid(), errorMessage);
log.error("{} failed: Peer {}. tradeId={}, uid={}, errorMessage={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid(), errorMessage);
SendMailboxMessageTask.this.onFault(errorMessage, message);
}
}
@ -98,7 +100,7 @@ public abstract class SendMailboxMessageTask extends TradeTask {
if (!isCompleted()) complete();
}
protected void onFault(String errorMessage, TradeMessage message) {
protected void onFault(String errorMessage, MailboxMessage message) {
setStateFault();
appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage);
failed(errorMessage);

View file

@ -21,6 +21,7 @@ import haveno.common.taskrunner.TaskRunner;
import haveno.core.support.dispute.mediation.MediationResultState;
import haveno.core.trade.Trade;
import haveno.core.trade.messages.TradeMailboxMessage;
import haveno.core.trade.protocol.TradePeer;
import haveno.core.trade.protocol.tasks.SendMailboxMessageTask;
import lombok.extern.slf4j.Slf4j;
@ -32,7 +33,12 @@ public class SendMediatedPayoutTxPublishedMessage extends SendMailboxMessageTask
}
@Override
protected TradeMailboxMessage getTradeMailboxMessage(String id) {
protected TradePeer getReceiver() {
return trade.getTradePeer();
}
@Override
protected TradeMailboxMessage getMailboxMessage(String id) {
throw new RuntimeException("SendMediatedPayoutTxPublishedMessage.getMessage(id) not implemented for xmr");
// Transaction payoutTx = checkNotNull(trade.getPayoutTx(), "trade.getPayoutTx() must not be null");
// return new MediatedPayoutTxPublishedMessage(

View file

@ -1614,6 +1614,7 @@ message TradePeer {
bytes mediated_payout_tx_signature = 22;
PaymentSentMessage payment_sent_message = 23;
PaymentReceivedMessage payment_received_message = 24;
DisputeOpenedMessage dispute_opened_message = 46;
DisputeClosedMessage dispute_closed_message = 25;
string reserve_tx_hash = 26;
string reserve_tx_hex = 27;
@ -1635,6 +1636,8 @@ message TradePeer {
string deposits_confirmed_message_state = 43;
string payment_sent_message_state = 44;
string payment_received_message_state = 45;
string dispute_opened_message_state = 47;
string dispute_closed_message_state = 48;
}
///////////////////////////////////////////////////////////////////////////////////////////