diff --git a/core/src/main/java/haveno/core/support/SupportManager.java b/core/src/main/java/haveno/core/support/SupportManager.java index ff795da058..f3d8b8c991 100644 --- a/core/src/main/java/haveno/core/support/SupportManager.java +++ b/core/src/main/java/haveno/core/support/SupportManager.java @@ -29,6 +29,7 @@ import haveno.core.support.messages.ChatMessage; import haveno.core.support.messages.SupportMessage; import haveno.core.trade.Trade; import haveno.core.trade.TradeManager; +import haveno.core.trade.protocol.TradePeer; import haveno.core.xmr.wallet.XmrWalletService; import haveno.network.p2p.AckMessage; import haveno.network.p2p.AckMessageSourceType; @@ -197,6 +198,16 @@ public abstract class SupportManager { synchronized (dispute.getChatMessages()) { for (ChatMessage chatMessage : dispute.getChatMessages()) { if (chatMessage.getUid().equals(ackMessage.getSourceUid())) { + + // set ack state + TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress()); + if (sender == null) { + log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid()); + } else { + sender.setDisputeOpenedAckMessage(ackMessage); + } + + // advance trade state if (trade.getDisputeState() == Trade.DisputeState.DISPUTE_PREPARING || trade.getDisputeState() == Trade.DisputeState.DISPUTE_REQUESTED) { // ack can arrive before saw arrived if (dispute.isClosed()) dispute.reOpen(); trade.advanceDisputeState(Trade.DisputeState.DISPUTE_OPENED); @@ -220,6 +231,16 @@ public abstract class SupportManager { for (ChatMessage chatMessage : dispute.getChatMessages()) { if (chatMessage.getUid().equals(ackMessage.getSourceUid())) { if (!trade.isArbitrator() && (trade.getDisputeState().isRequested() || trade.getDisputeState().isCloseRequested())) { + + // set ack state + TradePeer sender = trade.getTradePeer(ackMessage.getSenderNodeAddress()); + if (sender == null) { + log.warn("Received AckMessage from unknown peer {} for {} with tradeId={}, uid={}", ackMessage.getSenderNodeAddress(), ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid()); + } else { + sender.setDisputeOpenedAckMessage(ackMessage); + } + + // advance trade state log.warn("DisputeOpenedMessage was nacked. We close the dispute now. tradeId={}, nack sender={}", trade.getId(), ackMessage.getSenderNodeAddress()); dispute.setIsClosed(); trade.advanceDisputeState(Trade.DisputeState.DISPUTE_CLOSED); diff --git a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java index 554dfb8adc..bab7cd8f0c 100644 --- a/core/src/main/java/haveno/core/support/dispute/DisputeManager.java +++ b/core/src/main/java/haveno/core/support/dispute/DisputeManager.java @@ -68,6 +68,7 @@ import haveno.core.trade.SellerTrade; import haveno.core.trade.Trade; import haveno.core.trade.Trade.DisputeState; import haveno.core.trade.TradeManager; +import haveno.core.trade.protocol.ArbitratorProtocol; import haveno.core.trade.protocol.TradePeer; import haveno.core.xmr.wallet.Restrictions; import haveno.core.xmr.wallet.TradeWalletService; @@ -590,11 +591,28 @@ public abstract class DisputeManager> extends Sup // use existing dispute or create new Dispute dispute = reOpen ? storedDisputeOptional.get() : msgDispute; + // get contract + Contract contract = dispute.getContract(); + + // get sender + TradePeer sender; + PubKeyRing senderPubKeyRing; + if (reOpen) { // re-open can come from either peer + sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator(); + senderPubKeyRing = sender.getPubKeyRing(); + } else { + senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing(); + sender = trade.getTradePeer(senderPubKeyRing); + } + if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller"); + + // TODO: save message for reprocessing (arbitrator must remove this when processed or it'll attempt to be sent to peer) + // sender.setDisputeOpenedMessage(message); + // process on trade thread ThreadUtils.execute(() -> { synchronized (trade.getLock()) { String errorMessage = null; - PubKeyRing senderPubKeyRing = null; try { // initialize @@ -605,7 +623,6 @@ public abstract class DisputeManager> extends Sup } dispute.setSupportType(message.getSupportType()); dispute.setState(Dispute.State.NEW); - Contract contract = dispute.getContract(); // validate dispute try { @@ -634,17 +651,6 @@ public abstract class DisputeManager> extends Sup if (trade.getSeller().getPaymentAccountPayload() == null) trade.getSeller().setPaymentAccountPayload(dispute.getSellerPaymentAccountPayload()); } - // get sender - TradePeer sender; - if (reOpen) { // re-open can come from either peer - sender = trade.isArbitrator() ? trade.getTradePeer(message.getSenderNodeAddress()) : trade.getArbitrator(); - senderPubKeyRing = sender.getPubKeyRing(); - } else { - senderPubKeyRing = trade.isArbitrator() ? (dispute.isDisputeOpenerIsBuyer() ? contract.getBuyerPubKeyRing() : contract.getSellerPubKeyRing()) : trade.getArbitrator().getPubKeyRing(); - sender = trade.getTradePeer(senderPubKeyRing); - } - if (sender == null) throw new RuntimeException("Pub key ring is not from arbitrator, buyer, or seller"); - // update sender node address sender.setNodeAddress(message.getSenderNodeAddress()); @@ -833,8 +839,6 @@ public abstract class DisputeManager> extends Sup // create dispute opened message with peer dispute TradePeer peer = trade.getTradePeer(pubKeyRing); - PubKeyRing peersPubKeyRing = peer.getPubKeyRing(); - NodeAddress peersNodeAddress = peer.getNodeAddress(); DisputeOpenedMessage peerOpenedDisputeMessage = new DisputeOpenedMessage(dispute, p2PService.getAddress(), UUID.randomUUID().toString(), @@ -842,61 +846,11 @@ public abstract class DisputeManager> extends Sup updatedMultisigHex, trade.getArbitrator().getPaymentSentMessage()); - log.info("Send {} to peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, chatMessage.uid={}", - peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress, - peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(), - chatMessage.getUid()); - recordPendingMessage(peerOpenedDisputeMessage.getClass().getSimpleName()); - mailboxMessageService.sendEncryptedMailboxMessage(peersNodeAddress, - peersPubKeyRing, - peerOpenedDisputeMessage, - new SendMailboxMessageListener() { - @Override - public void onArrived() { - log.info("{} arrived at peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " + - "chatMessage.uid={}", - peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress, - peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(), - chatMessage.getUid()); + // save message for resending if needed + peer.setDisputeOpenedMessage(peerOpenedDisputeMessage); - clearPendingMessage(); - // We use the chatMessage wrapped inside the peerOpenedDisputeMessage for - // the state, as that is displayed to the user and we only persist that msg - chatMessage.setArrived(true); - persistNow(null); - } - - @Override - public void onStoredInMailbox() { - log.info("{} stored in mailbox for peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " + - "chatMessage.uid={}", - peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress, - peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(), - chatMessage.getUid()); - - clearPendingMessage(); - // We use the chatMessage wrapped inside the peerOpenedDisputeMessage for - // the state, as that is displayed to the user and we only persist that msg - chatMessage.setStoredInMailbox(true); - persistNow(null); - } - - @Override - public void onFault(String errorMessage) { - log.error("{} failed: Peer {}. tradeId={}, peerOpenedDisputeMessage.uid={}, " + - "chatMessage.uid={}, errorMessage={}", - peerOpenedDisputeMessage.getClass().getSimpleName(), peersNodeAddress, - peerOpenedDisputeMessage.getTradeId(), peerOpenedDisputeMessage.getUid(), - chatMessage.getUid(), errorMessage); - - clearPendingMessage(); - // We use the chatMessage wrapped inside the peerOpenedDisputeMessage for - // the state, as that is displayed to the user and we only persist that msg - chatMessage.setSendMessageError(errorMessage); - persistNow(null); - } - } - ); + // send dispute opened message + ((ArbitratorProtocol) trade.getProtocol()).sendDisputeOpenedMessageIfApplicable(); persistNow(null); } @@ -1142,6 +1096,7 @@ public abstract class DisputeManager> extends Sup .findAny(); } + // TODO: throw if more than one dispute found? should not be called then public Optional findDispute(String tradeId) { T disputeList = getDisputeList(); if (disputeList == null) { diff --git a/core/src/main/java/haveno/core/trade/ArbitratorTrade.java b/core/src/main/java/haveno/core/trade/ArbitratorTrade.java index ea179a655a..0329763ad9 100644 --- a/core/src/main/java/haveno/core/trade/ArbitratorTrade.java +++ b/core/src/main/java/haveno/core/trade/ArbitratorTrade.java @@ -26,6 +26,7 @@ import haveno.network.p2p.NodeAddress; import lombok.extern.slf4j.Slf4j; import java.math.BigInteger; +import java.util.Date; import java.util.UUID; import javax.annotation.Nullable; @@ -36,62 +37,69 @@ import javax.annotation.Nullable; @Slf4j public class ArbitratorTrade extends Trade { - public ArbitratorTrade(Offer offer, - BigInteger tradeAmount, - long tradePrice, - XmrWalletService xmrWalletService, - ProcessModel processModel, - String uid, - NodeAddress makerNodeAddress, - NodeAddress takerNodeAddress, - NodeAddress arbitratorNodeAddress, - @Nullable String challenge) { - super(offer, tradeAmount, tradePrice, xmrWalletService, processModel, uid, makerNodeAddress, takerNodeAddress, arbitratorNodeAddress, challenge); - } + private static final long resendDisputeOpenedMessageDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // 30 days - @Override - public BigInteger getPayoutAmountBeforeCost() { - throw new RuntimeException("Arbitrator does not have a payout amount"); - } + public ArbitratorTrade(Offer offer, + BigInteger tradeAmount, + long tradePrice, + XmrWalletService xmrWalletService, + ProcessModel processModel, + String uid, + NodeAddress makerNodeAddress, + NodeAddress takerNodeAddress, + NodeAddress arbitratorNodeAddress, + @Nullable String challenge) { + super(offer, tradeAmount, tradePrice, xmrWalletService, processModel, uid, makerNodeAddress, takerNodeAddress, arbitratorNodeAddress, challenge); + } - /////////////////////////////////////////////////////////////////////////////////////////// - // PROTO BUFFER - /////////////////////////////////////////////////////////////////////////////////////////// + @Override + public BigInteger getPayoutAmountBeforeCost() { + throw new RuntimeException("Arbitrator does not have a payout amount"); + } - @Override - public protobuf.Tradable toProtoMessage() { - return protobuf.Tradable.newBuilder() - .setArbitratorTrade(protobuf.ArbitratorTrade.newBuilder() - .setTrade((protobuf.Trade) super.toProtoMessage())) - .build(); - } + /////////////////////////////////////////////////////////////////////////////////////////// + // PROTO BUFFER + /////////////////////////////////////////////////////////////////////////////////////////// - public static Tradable fromProto(protobuf.ArbitratorTrade arbitratorTradeProto, - XmrWalletService xmrWalletService, - CoreProtoResolver coreProtoResolver) { - protobuf.Trade proto = arbitratorTradeProto.getTrade(); - ProcessModel processModel = ProcessModel.fromProto(proto.getProcessModel(), coreProtoResolver); - String uid = ProtoUtil.stringOrNullFromProto(proto.getUid()); - if (uid == null) { - uid = UUID.randomUUID().toString(); - } - return fromProto(new ArbitratorTrade( - Offer.fromProto(proto.getOffer()), - BigInteger.valueOf(proto.getAmount()), - proto.getPrice(), - xmrWalletService, - processModel, - uid, - proto.getProcessModel().getMaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getMaker().getNodeAddress()) : null, - proto.getProcessModel().getTaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getTaker().getNodeAddress()) : null, - proto.getProcessModel().getArbitrator().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getArbitrator().getNodeAddress()) : null, - ProtoUtil.stringOrNullFromProto(proto.getChallenge())), - proto, - coreProtoResolver); - } + @Override + public protobuf.Tradable toProtoMessage() { + return protobuf.Tradable.newBuilder() + .setArbitratorTrade(protobuf.ArbitratorTrade.newBuilder() + .setTrade((protobuf.Trade) super.toProtoMessage())) + .build(); + } - @Override - public boolean confirmPermitted() { - throw new RuntimeException("ArbitratorTrade.confirmPermitted() not implemented"); // TODO (woodser): implement - } + public static Tradable fromProto(protobuf.ArbitratorTrade arbitratorTradeProto, + XmrWalletService xmrWalletService, + CoreProtoResolver coreProtoResolver) { + protobuf.Trade proto = arbitratorTradeProto.getTrade(); + ProcessModel processModel = ProcessModel.fromProto(proto.getProcessModel(), coreProtoResolver); + String uid = ProtoUtil.stringOrNullFromProto(proto.getUid()); + if (uid == null) { + uid = UUID.randomUUID().toString(); + } + return fromProto(new ArbitratorTrade( + Offer.fromProto(proto.getOffer()), + BigInteger.valueOf(proto.getAmount()), + proto.getPrice(), + xmrWalletService, + processModel, + uid, + proto.getProcessModel().getMaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getMaker().getNodeAddress()) : null, + proto.getProcessModel().getTaker().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getTaker().getNodeAddress()) : null, + proto.getProcessModel().getArbitrator().hasNodeAddress() ? NodeAddress.fromProto(proto.getProcessModel().getArbitrator().getNodeAddress()) : null, + ProtoUtil.stringOrNullFromProto(proto.getChallenge())), + proto, + coreProtoResolver); + } + + @Override + public boolean confirmPermitted() { + throw new RuntimeException("ArbitratorTrade.confirmPermitted() not implemented"); // TODO (woodser): implement + } + + public boolean resendDisputeOpenedMessageWithinDuration() { + Date startDate = getMaxTradePeriodDate(); + return new Date().getTime() <= (startDate.getTime() + resendDisputeOpenedMessageDurationMs); + } } diff --git a/core/src/main/java/haveno/core/trade/SellerTrade.java b/core/src/main/java/haveno/core/trade/SellerTrade.java index 0369d1b52a..44f5149a6e 100644 --- a/core/src/main/java/haveno/core/trade/SellerTrade.java +++ b/core/src/main/java/haveno/core/trade/SellerTrade.java @@ -30,7 +30,7 @@ import java.util.Date; @Slf4j public abstract class SellerTrade extends Trade { - private static final long resendPaymentReceivedMessagesDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // ~1 month + private static final long resendPaymentReceivedMessagesDurationMs = 1L * 30 * 24 * 60 * 60 * 1000; // 30 days SellerTrade(Offer offer, BigInteger tradeAmount, diff --git a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java index b25c6c68d1..8818fcf49c 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/haveno/core/trade/protocol/ArbitratorProtocol.java @@ -9,9 +9,12 @@ import haveno.core.trade.messages.DepositResponse; import haveno.core.trade.messages.InitTradeRequest; import haveno.core.trade.messages.SignContractResponse; import haveno.core.trade.messages.TradeMessage; +import haveno.core.trade.protocol.FluentProtocol.Condition; import haveno.core.trade.protocol.tasks.ApplyFilter; import haveno.core.trade.protocol.tasks.ArbitratorProcessDepositRequest; import haveno.core.trade.protocol.tasks.ArbitratorProcessReserveTx; +import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToBuyer; +import haveno.core.trade.protocol.tasks.ArbitratorSendDisputeOpenedMessageToSeller; import haveno.core.trade.protocol.tasks.ArbitratorSendInitTradeOrMultisigRequests; import haveno.core.trade.protocol.tasks.ProcessInitTradeRequest; import haveno.core.trade.protocol.tasks.SendDepositsConfirmedMessageToBuyer; @@ -24,105 +27,149 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ArbitratorProtocol extends DisputeProtocol { - public ArbitratorProtocol(ArbitratorTrade trade) { - super(trade); - } - - @Override - protected void onTradeMessage(TradeMessage message, NodeAddress peer) { - super.onTradeMessage(message, peer); - } - - @Override - public void onMailboxMessage(TradeMessage message, NodeAddress peer) { - super.onMailboxMessage(message, peer); - } - - /////////////////////////////////////////////////////////////////////////////////////////// - // Incoming messages - /////////////////////////////////////////////////////////////////////////////////////////// - - public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { - log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); - ThreadUtils.execute(() -> { - synchronized (trade.getLock()) { - latchTrade(); - this.errorMessageHandler = errorMessageHandler; - processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set - expect(phase(Trade.Phase.INIT) - .with(message) - .from(peer)) - .setup(tasks( - ApplyFilter.class, - ProcessInitTradeRequest.class, - ArbitratorProcessReserveTx.class, - ArbitratorSendInitTradeOrMultisigRequests.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(); - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - handleTaskRunnerFault(peer, message, errorMessage); - })) - .withTimeout(TRADE_STEP_TIMEOUT_SECONDS)) - .executeTasks(true); - awaitTradeLatch(); - } - }, trade.getId()); - } - - @Override - public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { - log.warn("Arbitrator ignoring SignContractResponse"); - } - - public void handleDepositRequest(DepositRequest request, NodeAddress sender) { - log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); - ThreadUtils.execute(() -> { - synchronized (trade.getLock()) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), request); - processModel.setTradeMessage(request); - expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED) - .with(request) - .from(sender)) - .setup(tasks( - ArbitratorProcessDepositRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - if (trade.getState().ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) { - stopTimeout(); - this.errorMessageHandler = null; - } - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - }))) - .executeTasks(true); - awaitTradeLatch(); - } - }, trade.getId()); - } - - @Override - public void handleDepositResponse(DepositResponse response, NodeAddress sender) { - log.warn("Arbitrator ignoring DepositResponse for trade " + response.getOfferId()); - } - - @SuppressWarnings("unchecked") - @Override - public Class[] getDepositsConfirmedTasks() { - return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class }; - } - - @Override - public void handleError(String errorMessage) { - // set trade state to send deposit responses with nack - if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) { - trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); + public ArbitratorProtocol(ArbitratorTrade trade) { + super(trade); + } + + @Override + protected void onTradeMessage(TradeMessage message, NodeAddress peer) { + super.onTradeMessage(message, peer); + } + + @Override + public void onMailboxMessage(TradeMessage message, NodeAddress peer) { + super.onMailboxMessage(message, peer); + } + + @Override + protected void onInitialized() { + super.onInitialized(); + + // re-send dispute opened message if applicable + sendDisputeOpenedMessageIfApplicable(); + + // TODO: resend dispute closed message if not acked + } + + public void sendDisputeOpenedMessageIfApplicable() { + ThreadUtils.execute(() -> { + if (!needsToResendDisputeOpenedMessage()) return; + if (trade.isShutDownStarted() || trade.isPayoutPublished()) return; + synchronized (trade.getLock()) { + if (!needsToResendDisputeOpenedMessage()) return; + latchTrade(); + given(new Condition(trade)) + .setup(tasks( + ArbitratorSendDisputeOpenedMessageToBuyer.class, + ArbitratorSendDisputeOpenedMessageToSeller.class) + .using(new TradeTaskRunner(trade, + () -> { + unlatchTrade(); + }, + (errorMessage) -> { + log.warn("Error sending DisputeOpenedMessage: " + errorMessage); + unlatchTrade(); + }))) + .executeTasks(); + awaitTradeLatch(); + } + }, trade.getId()); + } + + private boolean needsToResendDisputeOpenedMessage() { + if (trade.isShutDownStarted()) return false; + if (trade.isPayoutPublished()) return false; + if (trade.getBuyer().getDisputeOpenedMessage() == null && trade.getSeller().getDisputeOpenedMessage() == null) return false; + if (trade.getDisputeState() != Trade.DisputeState.DISPUTE_OPENED) return false; + if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return false; + return !trade.getProcessModel().isDisputeOpenedMessageAckedOrStored(); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // Incoming messages + /////////////////////////////////////////////////////////////////////////////////////////// + + public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { + log.info(TradeProtocol.LOG_HIGHLIGHT + "handleInitTradeRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); + ThreadUtils.execute(() -> { + synchronized (trade.getLock()) { + latchTrade(); + this.errorMessageHandler = errorMessageHandler; + processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set + expect(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( + ApplyFilter.class, + ProcessInitTradeRequest.class, + ArbitratorProcessReserveTx.class, + ArbitratorSendInitTradeOrMultisigRequests.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(); + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + handleTaskRunnerFault(peer, message, errorMessage); + })) + .withTimeout(TRADE_STEP_TIMEOUT_SECONDS)) + .executeTasks(true); + awaitTradeLatch(); + } + }, trade.getId()); + } + + @Override + public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { + log.warn("Arbitrator ignoring SignContractResponse"); + } + + public void handleDepositRequest(DepositRequest request, NodeAddress sender) { + log.info(TradeProtocol.LOG_HIGHLIGHT + "handleDepositRequest() for {} {}", trade.getClass().getSimpleName(), trade.getShortId()); + ThreadUtils.execute(() -> { + synchronized (trade.getLock()) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), request); + processModel.setTradeMessage(request); + expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_REQUESTED) + .with(request) + .from(sender)) + .setup(tasks( + ArbitratorProcessDepositRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + if (trade.getState().ordinal() >= Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS.ordinal()) { + stopTimeout(); + this.errorMessageHandler = null; + } + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + }))) + .executeTasks(true); + awaitTradeLatch(); + } + }, trade.getId()); + } + + @Override + public void handleDepositResponse(DepositResponse response, NodeAddress sender) { + log.warn("Arbitrator ignoring DepositResponse for trade " + response.getOfferId()); + } + + @SuppressWarnings("unchecked") + @Override + public Class[] getDepositsConfirmedTasks() { + return new Class[] { SendDepositsConfirmedMessageToBuyer.class, SendDepositsConfirmedMessageToSeller.class }; + } + + @Override + public void handleError(String errorMessage) { + // set trade state to send deposit responses with nack + if (trade instanceof ArbitratorTrade && trade.getState() == Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) { + trade.setStateIfValidTransitionTo(Trade.State.PUBLISH_DEPOSIT_TX_REQUEST_FAILED); + } + super.handleError(errorMessage); } - super.handleError(errorMessage); - } } diff --git a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java index 68a1dcdbb7..1ca9b3b54e 100644 --- a/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java +++ b/core/src/main/java/haveno/core/trade/protocol/ProcessModel.java @@ -313,6 +313,10 @@ public class ProcessModel implements Model, PersistablePayload { return getArbitrator().isPaymentReceivedMessageAckedOrStored() && getBuyer().isPaymentReceivedMessageAckedOrStored(); } + public boolean isDisputeOpenedMessageAckedOrStored() { + return getBuyer().isDisputeOpenedMessageAckedOrStored() || getSeller().isDisputeOpenedMessageAckedOrStored(); + } + void setDepositTxSentAckMessage(AckMessage ackMessage) { MessageState messageState = ackMessage.isSuccess() ? MessageState.ACKNOWLEDGED : diff --git a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java index 7e3fb6184d..3a97b3a0e4 100644 --- a/core/src/main/java/haveno/core/trade/protocol/TradePeer.java +++ b/core/src/main/java/haveno/core/trade/protocol/TradePeer.java @@ -28,6 +28,7 @@ import haveno.core.network.MessageState; import haveno.core.payment.payload.PaymentAccountPayload; import haveno.core.proto.CoreProtoResolver; import haveno.core.support.dispute.messages.DisputeClosedMessage; +import haveno.core.support.dispute.messages.DisputeOpenedMessage; import haveno.core.trade.TradeManager; import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.PaymentSentMessage; @@ -100,6 +101,9 @@ public final class TradePeer implements PersistablePayload { @Nullable @Setter @Getter + private DisputeOpenedMessage disputeOpenedMessage; + @Setter + @Getter private DisputeClosedMessage disputeClosedMessage; // added in v 0.6 @@ -159,6 +163,10 @@ public final class TradePeer implements PersistablePayload { private ObjectProperty paymentSentMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); @Setter private ObjectProperty paymentReceivedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + @Setter + private ObjectProperty disputeOpenedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); + @Setter + private ObjectProperty disputeClosedMessageStateProperty = new SimpleObjectProperty<>(MessageState.UNDEFINED); public TradePeer() { } @@ -247,6 +255,27 @@ public final class TradePeer implements PersistablePayload { } } + public void setDisputeOpenedAckMessage(AckMessage ackMessage) { + MessageState messageState = ackMessage.isSuccess() ? + MessageState.ACKNOWLEDGED : + MessageState.NACKED; + setDisputeOpenedMessageState(messageState); + } + + public void setDisputeOpenedMessageState(MessageState disputeOpenedMessageStateProperty) { + this.disputeOpenedMessageStateProperty.set(disputeOpenedMessageStateProperty); + if (tradeManager != null) { + tradeManager.requestPersistence(); + } + } + + public void setDisputeClosedMessageState(MessageState disputeClosedMessageStateProperty) { + this.disputeClosedMessageStateProperty.set(disputeClosedMessageStateProperty); + if (tradeManager != null) { + tradeManager.requestPersistence(); + } + } + public boolean isDepositsConfirmedMessageAcked() { return depositsConfirmedMessageStateProperty.get() == MessageState.ACKNOWLEDGED; } @@ -271,6 +300,14 @@ public final class TradePeer implements PersistablePayload { return paymentReceivedMessageStateProperty.get() == MessageState.ARRIVED; } + public boolean isDisputeOpenedMessageReceived() { + return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX || disputeOpenedMessageStateProperty.get() == MessageState.NACKED; + } + + public boolean isDisputeOpenedMessageAckedOrStored() { + return disputeOpenedMessageStateProperty.get() == MessageState.ACKNOWLEDGED || disputeOpenedMessageStateProperty.get() == MessageState.STORED_IN_MAILBOX; + } + @Override public Message toProtoMessage() { final protobuf.TradePeer.Builder builder = protobuf.TradePeer.newBuilder(); @@ -293,6 +330,7 @@ public final class TradePeer implements PersistablePayload { Optional.ofNullable(mediatedPayoutTxSignature).ifPresent(e -> builder.setMediatedPayoutTxSignature(ByteString.copyFrom(e))); Optional.ofNullable(paymentSentMessage).ifPresent(e -> builder.setPaymentSentMessage(paymentSentMessage.toProtoNetworkEnvelope().getPaymentSentMessage())); Optional.ofNullable(paymentReceivedMessage).ifPresent(e -> builder.setPaymentReceivedMessage(paymentReceivedMessage.toProtoNetworkEnvelope().getPaymentReceivedMessage())); + Optional.ofNullable(disputeOpenedMessage).ifPresent(e -> builder.setDisputeOpenedMessage(disputeOpenedMessage.toProtoNetworkEnvelope().getDisputeOpenedMessage())); Optional.ofNullable(disputeClosedMessage).ifPresent(e -> builder.setDisputeClosedMessage(disputeClosedMessage.toProtoNetworkEnvelope().getDisputeClosedMessage())); Optional.ofNullable(reserveTxHash).ifPresent(e -> builder.setReserveTxHash(reserveTxHash)); Optional.ofNullable(reserveTxHex).ifPresent(e -> builder.setReserveTxHex(reserveTxHex)); @@ -314,6 +352,8 @@ public final class TradePeer implements PersistablePayload { builder.setDepositsConfirmedMessageState(depositsConfirmedMessageStateProperty.get().name()); builder.setPaymentSentMessageState(paymentSentMessageStateProperty.get().name()); builder.setPaymentReceivedMessageState(paymentReceivedMessageStateProperty.get().name()); + builder.setDisputeOpenedMessageState(disputeOpenedMessageStateProperty.get().name()); + builder.setDisputeClosedMessageState(disputeClosedMessageStateProperty.get().name()); builder.setCurrentDate(currentDate); return builder.build(); @@ -345,6 +385,7 @@ public final class TradePeer implements PersistablePayload { tradePeer.setMediatedPayoutTxSignature(ProtoUtil.byteArrayOrNullFromProto(proto.getMediatedPayoutTxSignature())); tradePeer.setPaymentSentMessage(proto.hasPaymentSentMessage() ? PaymentSentMessage.fromProto(proto.getPaymentSentMessage(), Version.getP2PMessageVersion()) : null); tradePeer.setPaymentReceivedMessage(proto.hasPaymentReceivedMessage() ? PaymentReceivedMessage.fromProto(proto.getPaymentReceivedMessage(), Version.getP2PMessageVersion()) : null); + tradePeer.setDisputeOpenedMessage(proto.hasDisputeOpenedMessage() ? DisputeOpenedMessage.fromProto(proto.getDisputeOpenedMessage(), coreProtoResolver, Version.getP2PMessageVersion()) : null); tradePeer.setDisputeClosedMessage(proto.hasDisputeClosedMessage() ? DisputeClosedMessage.fromProto(proto.getDisputeClosedMessage(), Version.getP2PMessageVersion()) : null); tradePeer.setReserveTxHash(ProtoUtil.stringOrNullFromProto(proto.getReserveTxHash())); tradePeer.setReserveTxHex(ProtoUtil.stringOrNullFromProto(proto.getReserveTxHex())); @@ -376,6 +417,14 @@ public final class TradePeer implements PersistablePayload { MessageState paymentReceivedMessageState = ProtoUtil.enumFromProto(MessageState.class, paymentReceivedMessageStateString); tradePeer.setPaymentReceivedMessageState(paymentReceivedMessageState); + String disputeOpenedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDisputeOpenedMessageState()); + MessageState disputeOpenedMessageState = ProtoUtil.enumFromProto(MessageState.class, disputeOpenedMessageStateString); + tradePeer.setDisputeOpenedMessageState(disputeOpenedMessageState); + + String disputeClosedMessageStateString = ProtoUtil.stringOrNullFromProto(proto.getDisputeClosedMessageState()); + MessageState disputeClosedMessageState = ProtoUtil.enumFromProto(MessageState.class, disputeClosedMessageStateString); + tradePeer.setDisputeClosedMessageState(disputeClosedMessageState); + return tradePeer; } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java new file mode 100644 index 0000000000..81bf59710b --- /dev/null +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessage.java @@ -0,0 +1,190 @@ +/* + * This file is part of Haveno. + * + * Haveno is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Haveno is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Haveno. If not, see . + */ + +package haveno.core.trade.protocol.tasks; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import haveno.common.Timer; +import haveno.common.UserThread; +import haveno.common.taskrunner.TaskRunner; +import haveno.core.network.MessageState; +import haveno.core.support.dispute.Dispute; +import haveno.core.support.dispute.messages.DisputeOpenedMessage; +import haveno.core.support.messages.ChatMessage; +import haveno.core.trade.ArbitratorTrade; +import haveno.core.trade.HavenoUtils; +import haveno.core.trade.Trade; +import haveno.network.p2p.mailbox.MailboxMessage; +import javafx.beans.value.ChangeListener; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +/** + * Arbitrator sends the DisputeOpenedMessage. + * We wait to receive a ACK message back and resend the message + * in case that does not happen in 10 minutes or if the message was stored in mailbox or failed. We keep repeating that + * with doubling the interval each time and until the MAX_RESEND_ATTEMPTS is reached. + * If never successful we give up and complete. It might be a valid case that the peer was not online for an extended + * time but we can be very sure that our message was stored as mailbox message in the network and one the peer goes + * online he will process it. + */ +@Slf4j +@EqualsAndHashCode(callSuper = true) +public abstract class ArbitratorSendDisputeOpenedMessage extends SendMailboxMessageTask { + private ChangeListener listener; + private Timer timer; + private static final int MAX_RESEND_ATTEMPTS = 20; + private int delayInMin = 10; + private int resendCounter = 0; + private DisputeOpenedMessage message = null; + + public ArbitratorSendDisputeOpenedMessage(TaskRunner taskHandler, Trade trade) { + super(taskHandler, trade); + } + + @Override + protected void run() { + try { + runInterceptHook(); + + // reset nack state + if (getReceiver().isDisputeOpenedMessageReceived()) { + getReceiver().setDisputeOpenedMessageState(MessageState.UNDEFINED); + } + + // skip if not applicable or already acked + if (stopSending()) { + if (!isCompleted()) complete(); + return; + } + + // reset ack state + getReceiver().setPaymentReceivedMessageState(MessageState.UNDEFINED); + super.run(); + } catch (Throwable t) { + failed(t); + } + } + + protected Optional getDispute() { + return HavenoUtils.arbitrationManager.findDispute(getReceiver().getDisputeOpenedMessage().getDispute()); + } + + protected ChatMessage getSystemChatMessage() { + return getDispute().get().getChatMessages().get(0); + } + + @Override + protected MailboxMessage getMailboxMessage(String tradeId) { + if (message == null) message = getReceiver().getDisputeOpenedMessage(); + return message; + } + + @Override + protected void setStateSent() { + getReceiver().setDisputeOpenedMessageState(MessageState.SENT); + tryToSendAgainLater(); + processModel.getTradeManager().requestPersistence(); + } + + @Override + protected void setStateArrived() { + getReceiver().setDisputeOpenedMessageState(MessageState.ARRIVED); + getSystemChatMessage().setArrived(true); + processModel.getTradeManager().requestPersistence(); + } + + @Override + protected void setStateStoredInMailbox() { + getReceiver().setDisputeOpenedMessageState(MessageState.STORED_IN_MAILBOX); + getSystemChatMessage().setStoredInMailbox(true); + processModel.getTradeManager().requestPersistence(); + } + + @Override + protected void setStateFault() { + getReceiver().setDisputeOpenedMessageState(MessageState.FAILED); + getSystemChatMessage().setSendMessageError(errorMessage); + processModel.getTradeManager().requestPersistence(); + } + + private void cleanup() { + if (timer != null) { + timer.stop(); + } + if (listener != null) { + getReceiver().getDisputeOpenedMessageStateProperty().removeListener(listener); + } + } + + private void tryToSendAgainLater() { + + // skip if already acked + if (stopSending()) return; + + if (resendCounter >= MAX_RESEND_ATTEMPTS) { + cleanup(); + log.warn("We never received an ACK message when sending the DisputeOpenedMessage to the peer. We stop trying to send the message."); + return; + } + + if (timer != null) { + timer.stop(); + } + + timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + + if (resendCounter == 0) { + listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue); + getReceiver().getDisputeOpenedMessageStateProperty().addListener(listener); + onMessageStateChange(getReceiver().getDisputeOpenedMessageStateProperty().get()); + } + + // first re-send is after 2 minutes, then increase the delay exponentially + if (resendCounter == 0) { + int shortDelay = 2; + log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", shortDelay); + timer = UserThread.runAfter(this::run, shortDelay, TimeUnit.MINUTES); + } else { + log.info("We will send the DisputeOpenedMessage again to the peer after a delay of {} min.", delayInMin); + timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES); + delayInMin = (int) ((double) delayInMin * 1.5); + } + resendCounter++; + } + + private void onMessageStateChange(MessageState newValue) { + if (isMessageReceived()) { + cleanup(); + } + } + + protected boolean isMessageReceived() { + return getReceiver().isDisputeOpenedMessageReceived(); + } + + protected boolean stopSending() { + if (getReceiver().getDisputeOpenedMessage() == null) return true; // stop if no message to send + if (isMessageReceived()) return true; // stop if message received + if (trade.isPayoutPublished()) return true; // stop if payout is published + if (!((ArbitratorTrade) trade).resendDisputeOpenedMessageWithinDuration()) return true; // stop if payout is published and we are not in the resend period + if (message != null && !message.equals(getReceiver().getDisputeOpenedMessage())) return true; // stop if message state is outdated + return false; + } +} diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToBuyer.java new file mode 100644 index 0000000000..52cc151ace --- /dev/null +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToBuyer.java @@ -0,0 +1,38 @@ +/* + * This file is part of Haveno. + * + * Haveno is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Haveno is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Haveno. If not, see . + */ + +package haveno.core.trade.protocol.tasks; + +import haveno.common.taskrunner.TaskRunner; +import haveno.core.trade.Trade; +import haveno.core.trade.protocol.TradePeer; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +@EqualsAndHashCode(callSuper = true) +@Slf4j +public class ArbitratorSendDisputeOpenedMessageToBuyer extends ArbitratorSendDisputeOpenedMessage { + + public ArbitratorSendDisputeOpenedMessageToBuyer(TaskRunner taskHandler, Trade trade) { + super(taskHandler, trade); + } + + @Override + protected TradePeer getReceiver() { + return trade.getBuyer(); + } +} diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToSeller.java new file mode 100644 index 0000000000..e5637022cd --- /dev/null +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/ArbitratorSendDisputeOpenedMessageToSeller.java @@ -0,0 +1,38 @@ +/* + * This file is part of Haveno. + * + * Haveno is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Haveno is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Haveno. If not, see . + */ + +package haveno.core.trade.protocol.tasks; + +import haveno.common.taskrunner.TaskRunner; +import haveno.core.trade.Trade; +import haveno.core.trade.protocol.TradePeer; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +@EqualsAndHashCode(callSuper = true) +@Slf4j +public class ArbitratorSendDisputeOpenedMessageToSeller extends ArbitratorSendDisputeOpenedMessage { + + public ArbitratorSendDisputeOpenedMessageToSeller(TaskRunner taskHandler, Trade trade) { + super(taskHandler, trade); + } + + @Override + protected TradePeer getReceiver() { + return trade.getSeller(); + } +} diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java index 86bb957577..b98f2f03cb 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessage.java @@ -38,16 +38,13 @@ import java.util.concurrent.TimeUnit; import haveno.common.Timer; import haveno.common.UserThread; -import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.network.MessageState; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.PaymentSentMessage; import haveno.core.trade.messages.TradeMailboxMessage; -import haveno.core.trade.protocol.TradePeer; import haveno.core.util.JsonUtil; -import haveno.network.p2p.NodeAddress; import javafx.beans.value.ChangeListener; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -74,18 +71,6 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask super(taskHandler, trade); } - protected abstract TradePeer getReceiver(); - - @Override - protected NodeAddress getReceiverNodeAddress() { - return getReceiver().getNodeAddress(); - } - - @Override - protected PubKeyRing getReceiverPubKeyRing() { - return getReceiver().getPubKeyRing(); - } - @Override protected void run() { try { @@ -104,7 +89,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask } @Override - protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) { + protected TradeMailboxMessage getMailboxMessage(String tradeId) { if (getReceiver().getPaymentSentMessage() == null) { // We do not use a real unique ID here as we want to be able to re-send the exact same message in case the @@ -170,7 +155,7 @@ public abstract class BuyerSendPaymentSentMessage extends SendMailboxMessageTask timer.stop(); } if (listener != null) { - trade.getSeller().getPaymentReceivedMessageStateProperty().removeListener(listener); + getReceiver().getPaymentReceivedMessageStateProperty().removeListener(listener); } } diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java index 57ca170455..25732ed114 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/BuyerSendPaymentSentMessageToSeller.java @@ -19,8 +19,8 @@ package haveno.core.trade.protocol.tasks; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.core.trade.messages.TradeMessage; import haveno.core.trade.protocol.TradePeer; +import haveno.network.p2p.mailbox.MailboxMessage; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -63,7 +63,7 @@ public class BuyerSendPaymentSentMessageToSeller extends BuyerSendPaymentSentMes // continue execution on fault so payment sent message is sent to arbitrator @Override - protected void onFault(String errorMessage, TradeMessage message) { + protected void onFault(String errorMessage, MailboxMessage message) { setStateFault(); appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage); complete(); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java index 7ca993db30..5a53eb6515 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessage.java @@ -38,7 +38,6 @@ import com.google.common.base.Charsets; import haveno.common.Timer; import haveno.common.UserThread; -import haveno.common.crypto.PubKeyRing; import haveno.common.crypto.Sig; import haveno.common.taskrunner.TaskRunner; import haveno.core.account.sign.SignedWitness; @@ -49,9 +48,7 @@ import haveno.core.trade.SellerTrade; import haveno.core.trade.Trade; import haveno.core.trade.messages.PaymentReceivedMessage; import haveno.core.trade.messages.TradeMailboxMessage; -import haveno.core.trade.protocol.TradePeer; import haveno.core.util.JsonUtil; -import haveno.network.p2p.NodeAddress; import javafx.beans.value.ChangeListener; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -79,18 +76,6 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag public SellerSendPaymentReceivedMessage(TaskRunner taskHandler, Trade trade) { super(taskHandler, trade); } - - protected abstract TradePeer getReceiver(); - - @Override - protected NodeAddress getReceiverNodeAddress() { - return getReceiver().getNodeAddress(); - } - - @Override - protected PubKeyRing getReceiverPubKeyRing() { - return getReceiver().getPubKeyRing(); - } @Override protected void run() { @@ -117,7 +102,7 @@ public abstract class SellerSendPaymentReceivedMessage extends SendMailboxMessag } @Override - protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) { + protected TradeMailboxMessage getMailboxMessage(String tradeId) { if (getReceiver().getPaymentReceivedMessage() == null) { // sign account witness diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java index 212dcb22f4..da3a0be71f 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SellerSendPaymentReceivedMessageToBuyer.java @@ -19,8 +19,8 @@ package haveno.core.trade.protocol.tasks; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.core.trade.messages.TradeMessage; import haveno.core.trade.protocol.TradePeer; +import haveno.network.p2p.mailbox.MailboxMessage; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -63,7 +63,7 @@ public class SellerSendPaymentReceivedMessageToBuyer extends SellerSendPaymentRe // continue execution on fault so payment received message is sent to arbitrator @Override - protected void onFault(String errorMessage, TradeMessage message) { + protected void onFault(String errorMessage, MailboxMessage message) { setStateFault(); appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage); complete(); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java index ba20d74351..2962df712e 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendDepositsConfirmedMessage.java @@ -21,15 +21,12 @@ import java.util.concurrent.TimeUnit; import haveno.common.Timer; import haveno.common.UserThread; -import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.network.MessageState; import haveno.core.trade.HavenoUtils; import haveno.core.trade.Trade; import haveno.core.trade.messages.DepositsConfirmedMessage; import haveno.core.trade.messages.TradeMailboxMessage; -import haveno.core.trade.protocol.TradePeer; -import haveno.network.p2p.NodeAddress; import lombok.extern.slf4j.Slf4j; /** @@ -65,20 +62,8 @@ public abstract class SendDepositsConfirmedMessage extends SendMailboxMessageTas } } - protected abstract TradePeer getReceiver(); - @Override - protected NodeAddress getReceiverNodeAddress() { - return getReceiver().getNodeAddress(); - } - - @Override - protected PubKeyRing getReceiverPubKeyRing() { - return getReceiver().getPubKeyRing(); - } - - @Override - protected TradeMailboxMessage getTradeMailboxMessage(String tradeId) { + protected TradeMailboxMessage getMailboxMessage(String tradeId) { if (message == null) { // export multisig hex once diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java b/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java index 24638c5e70..51d3344014 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/SendMailboxMessageTask.java @@ -20,10 +20,10 @@ package haveno.core.trade.protocol.tasks; import haveno.common.crypto.PubKeyRing; import haveno.common.taskrunner.TaskRunner; import haveno.core.trade.Trade; -import haveno.core.trade.messages.TradeMailboxMessage; -import haveno.core.trade.messages.TradeMessage; +import haveno.core.trade.protocol.TradePeer; import haveno.network.p2p.NodeAddress; import haveno.network.p2p.SendMailboxMessageListener; +import haveno.network.p2p.mailbox.MailboxMessage; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -32,15 +32,17 @@ public abstract class SendMailboxMessageTask extends TradeTask { super(taskHandler, trade); } + protected abstract TradePeer getReceiver(); + protected NodeAddress getReceiverNodeAddress() { - return trade.getTradePeer().getNodeAddress(); + return getReceiver().getNodeAddress(); } protected PubKeyRing getReceiverPubKeyRing() { - return trade.getTradePeer().getPubKeyRing(); + return getReceiver().getPubKeyRing(); } - protected abstract TradeMailboxMessage getTradeMailboxMessage(String tradeId); + protected abstract MailboxMessage getMailboxMessage(String tradeId); protected abstract void setStateSent(); @@ -55,7 +57,7 @@ public abstract class SendMailboxMessageTask extends TradeTask { try { runInterceptHook(); String id = processModel.getOfferId(); - TradeMailboxMessage message = getTradeMailboxMessage(id); + MailboxMessage message = getMailboxMessage(id); setStateSent(); NodeAddress peersNodeAddress = getReceiverNodeAddress(); log.info("Send {} to peer {} for {} {}, uid={}", @@ -69,21 +71,21 @@ public abstract class SendMailboxMessageTask extends TradeTask { new SendMailboxMessageListener() { @Override public void onArrived() { - log.info("{} arrived at peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid()); + log.info("{} arrived at peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid()); setStateArrived(); if (!task.isCompleted()) complete(); } @Override public void onStoredInMailbox() { - log.info("{} stored in mailbox for peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid()); + log.info("{} stored in mailbox for peer {}. tradeId={}, uid={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid()); SendMailboxMessageTask.this.onStoredInMailbox(); } @Override public void onFault(String errorMessage) { if (processModel.getP2PService().isShutDownStarted()) return; - log.error("{} failed: Peer {}. tradeId={}, uid={}, errorMessage={}", message.getClass().getSimpleName(), peersNodeAddress, message.getOfferId(), message.getUid(), errorMessage); + log.error("{} failed: Peer {}. tradeId={}, uid={}, errorMessage={}", message.getClass().getSimpleName(), peersNodeAddress, trade.getId(), message.getUid(), errorMessage); SendMailboxMessageTask.this.onFault(errorMessage, message); } } @@ -98,7 +100,7 @@ public abstract class SendMailboxMessageTask extends TradeTask { if (!isCompleted()) complete(); } - protected void onFault(String errorMessage, TradeMessage message) { + protected void onFault(String errorMessage, MailboxMessage message) { setStateFault(); appendToErrorMessage("Sending message failed: message=" + message + "\nerrorMessage=" + errorMessage); failed(errorMessage); diff --git a/core/src/main/java/haveno/core/trade/protocol/tasks/mediation/SendMediatedPayoutTxPublishedMessage.java b/core/src/main/java/haveno/core/trade/protocol/tasks/mediation/SendMediatedPayoutTxPublishedMessage.java index 906859e66e..ceee7aba2c 100644 --- a/core/src/main/java/haveno/core/trade/protocol/tasks/mediation/SendMediatedPayoutTxPublishedMessage.java +++ b/core/src/main/java/haveno/core/trade/protocol/tasks/mediation/SendMediatedPayoutTxPublishedMessage.java @@ -21,6 +21,7 @@ import haveno.common.taskrunner.TaskRunner; import haveno.core.support.dispute.mediation.MediationResultState; import haveno.core.trade.Trade; import haveno.core.trade.messages.TradeMailboxMessage; +import haveno.core.trade.protocol.TradePeer; import haveno.core.trade.protocol.tasks.SendMailboxMessageTask; import lombok.extern.slf4j.Slf4j; @@ -32,7 +33,12 @@ public class SendMediatedPayoutTxPublishedMessage extends SendMailboxMessageTask } @Override - protected TradeMailboxMessage getTradeMailboxMessage(String id) { + protected TradePeer getReceiver() { + return trade.getTradePeer(); + } + + @Override + protected TradeMailboxMessage getMailboxMessage(String id) { throw new RuntimeException("SendMediatedPayoutTxPublishedMessage.getMessage(id) not implemented for xmr"); // Transaction payoutTx = checkNotNull(trade.getPayoutTx(), "trade.getPayoutTx() must not be null"); // return new MediatedPayoutTxPublishedMessage( diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 2a8a2ed8cd..be3a6555c9 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -1614,6 +1614,7 @@ message TradePeer { bytes mediated_payout_tx_signature = 22; PaymentSentMessage payment_sent_message = 23; PaymentReceivedMessage payment_received_message = 24; + DisputeOpenedMessage dispute_opened_message = 46; DisputeClosedMessage dispute_closed_message = 25; string reserve_tx_hash = 26; string reserve_tx_hex = 27; @@ -1635,6 +1636,8 @@ message TradePeer { string deposits_confirmed_message_state = 43; string payment_sent_message_state = 44; string payment_received_message_state = 45; + string dispute_opened_message_state = 47; + string dispute_closed_message_state = 48; } ///////////////////////////////////////////////////////////////////////////////////////////