From c5f5a5af425c131458e3024576c2d8c85216dc55 Mon Sep 17 00:00:00 2001 From: woodser Date: Thu, 22 Sep 2022 13:21:04 -0400 Subject: [PATCH] process messages on user thread and protocol tasks off user thread --- Makefile | 4 +- .../bisq/core/offer/OpenOfferManager.java | 154 +++++++------ .../tasks/MakerSendSignOfferRequest.java | 6 +- .../trade/protocol/ArbitratorProtocol.java | 144 ++++++------ .../trade/protocol/BuyerAsMakerProtocol.java | 48 ++-- .../trade/protocol/BuyerAsTakerProtocol.java | 48 ++-- .../trade/protocol/SellerAsMakerProtocol.java | 48 ++-- .../trade/protocol/SellerAsTakerProtocol.java | 48 ++-- .../core/trade/protocol/TradeProtocol.java | 214 +++++++++--------- .../offerbook/OfferBookChartViewModel.java | 24 +- .../bisq/network/p2p/network/Connection.java | 22 +- 11 files changed, 392 insertions(+), 368 deletions(-) diff --git a/Makefile b/Makefile index ff3ce2cd94..9cc639d5dc 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,7 @@ monerod-local1: --no-zmq \ --add-exclusive-node 127.0.0.1:28080 \ --rpc-access-control-origins http://localhost:8080 \ - --fixed-difficulty 400 + --fixed-difficulty 800 monerod-local2: ./.localnet/monerod \ @@ -82,7 +82,7 @@ monerod-local2: --confirm-external-bind \ --add-exclusive-node 127.0.0.1:48080 \ --rpc-access-control-origins http://localhost:8080 \ - --fixed-difficulty 400 + --fixed-difficulty 800 funding-wallet-local: ./.localnet/monero-wallet-rpc \ diff --git a/core/src/main/java/bisq/core/offer/OpenOfferManager.java b/core/src/main/java/bisq/core/offer/OpenOfferManager.java index aea9af690e..0b39f4bc4c 100644 --- a/core/src/main/java/bisq/core/offer/OpenOfferManager.java +++ b/core/src/main/java/bisq/core/offer/OpenOfferManager.java @@ -621,89 +621,93 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private void processUnpostedOffers(TransactionResultHandler resultHandler, // TODO (woodser): transaction not needed with result handler ErrorMessageHandler errorMessageHandler) { - List errorMessages = new ArrayList(); - for (OpenOffer scheduledOffer : openOffers.getObservableList()) { - if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; - CountDownLatch latch = new CountDownLatch(openOffers.list.size()); - processUnpostedOffer(scheduledOffer, (transaction) -> { - latch.countDown(); - }, errorMessage -> { - latch.countDown(); - errorMessages.add(errorMessage); - }); - TradeUtils.awaitLatch(latch); - } - requestPersistence(); - if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); - else resultHandler.handleResult(null); + new Thread(() -> { + List errorMessages = new ArrayList(); + for (OpenOffer scheduledOffer : openOffers.getObservableList()) { + if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; + CountDownLatch latch = new CountDownLatch(1); + processUnpostedOffer(scheduledOffer, (transaction) -> { + latch.countDown(); + }, errorMessage -> { + latch.countDown(); + errorMessages.add(errorMessage); + }); + TradeUtils.awaitLatch(latch); + } + requestPersistence(); + if (errorMessages.size() > 0) errorMessageHandler.handleErrorMessage(errorMessages.toString()); + else resultHandler.handleResult(null); + }).start(); } private void processUnpostedOffer(OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - try { + new Thread(() -> { + try { - // done processing if wallet not initialized - if (xmrWalletService.getWallet() == null) { - resultHandler.handleResult(null); - return; - } - - // get offer reserve amount - Coin offerReserveAmountCoin = openOffer.getOffer().getReserveAmount(); - BigInteger offerReserveAmount = ParsingUtils.centinerosToAtomicUnits(offerReserveAmountCoin.value); - - // handle sufficient available balance - if (xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0) { - - // split outputs if applicable - boolean splitOutput = openOffer.isAutoSplit(); // TODO: determine if output needs split - if (splitOutput) { - throw new Error("Post offer with split output option not yet supported"); // TODO: support scheduling offer with split outputs + // done processing if wallet not initialized + if (xmrWalletService.getWallet() == null) { + resultHandler.handleResult(null); + return; } - // otherwise sign and post offer - else { - signAndPostOffer(openOffer, offerReserveAmountCoin, true, resultHandler, errorMessageHandler); - } - return; - } + // get offer reserve amount + Coin offerReserveAmountCoin = openOffer.getOffer().getReserveAmount(); + BigInteger offerReserveAmount = ParsingUtils.centinerosToAtomicUnits(offerReserveAmountCoin.value); - // handle unscheduled offer - if (openOffer.getScheduledTxHashes() == null) { + // handle sufficient available balance + if (xmrWalletService.getWallet().getUnlockedBalance(0).compareTo(offerReserveAmount) >= 0) { - // check for sufficient balance - scheduled offers amount - if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) { - throw new RuntimeException("Not enough money in Haveno wallet"); - } - - // get locked txs - List lockedTxs = xmrWalletService.getWallet().getTxs(new MoneroTxQuery().setIsLocked(true)); - - // get earliest unscheduled txs with sufficient incoming amount - List scheduledTxHashes = new ArrayList(); - BigInteger scheduledAmount = new BigInteger("0"); - for (MoneroTxWallet lockedTx : lockedTxs) { - if (isTxScheduled(lockedTx.getHash())) continue; - if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue; - scheduledTxHashes.add(lockedTx.getHash()); - for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) { - if (transfer.getAccountIndex() == 0) scheduledAmount = scheduledAmount.add(transfer.getAmount()); + // split outputs if applicable + boolean splitOutput = openOffer.isAutoSplit(); // TODO: determine if output needs split + if (splitOutput) { + throw new Error("Post offer with split output option not yet supported"); // TODO: support scheduling offer with split outputs } - if (scheduledAmount.compareTo(offerReserveAmount) >= 0) break; + + // otherwise sign and post offer + else { + signAndPostOffer(openOffer, offerReserveAmountCoin, true, resultHandler, errorMessageHandler); + } + return; } - if (scheduledAmount.compareTo(offerReserveAmount) < 0) throw new RuntimeException("Not enough funds to schedule offer"); - // schedule txs - openOffer.setScheduledTxHashes(scheduledTxHashes); - openOffer.setScheduledAmount(scheduledAmount.toString()); - openOffer.setState(OpenOffer.State.SCHEDULED); + // handle unscheduled offer + if (openOffer.getScheduledTxHashes() == null) { + + // check for sufficient balance - scheduled offers amount + if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) { + throw new RuntimeException("Not enough money in Haveno wallet"); + } + + // get locked txs + List lockedTxs = xmrWalletService.getWallet().getTxs(new MoneroTxQuery().setIsLocked(true)); + + // get earliest unscheduled txs with sufficient incoming amount + List scheduledTxHashes = new ArrayList(); + BigInteger scheduledAmount = new BigInteger("0"); + for (MoneroTxWallet lockedTx : lockedTxs) { + if (isTxScheduled(lockedTx.getHash())) continue; + if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue; + scheduledTxHashes.add(lockedTx.getHash()); + for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) { + if (transfer.getAccountIndex() == 0) scheduledAmount = scheduledAmount.add(transfer.getAmount()); + } + if (scheduledAmount.compareTo(offerReserveAmount) >= 0) break; + } + if (scheduledAmount.compareTo(offerReserveAmount) < 0) throw new RuntimeException("Not enough funds to schedule offer"); + + // schedule txs + openOffer.setScheduledTxHashes(scheduledTxHashes); + openOffer.setScheduledAmount(scheduledAmount.toString()); + openOffer.setState(OpenOffer.State.SCHEDULED); + } + + // handle result + resultHandler.handleResult(null); + } catch (Exception e) { + e.printStackTrace(); + errorMessageHandler.handleErrorMessage(e.getMessage()); } - - // handle result - resultHandler.handleResult(null); - } catch (Exception e) { - e.printStackTrace(); - errorMessageHandler.handleErrorMessage(e.getMessage()); - } + }).start(); } private BigInteger getScheduledAmount() { @@ -790,6 +794,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe log.info("Received SignOfferRequest from {} with offerId {} and uid {}", peer, request.getOfferId(), request.getUid()); + boolean result = false; String errorMessage = null; try { @@ -845,9 +850,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe signedOffers.add(signedOffer); requestPersistence(); - // send ack - sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), true, errorMessage); - // send response with signature SignOfferResponse response = new SignOfferResponse(request.getOfferId(), UUID.randomUUID().toString(), @@ -874,11 +876,13 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe errorMessage); } }); + result = true; } catch (Exception e) { e.printStackTrace(); errorMessage = "Exception at handleSignOfferRequest " + e.getMessage(); log.error(errorMessage); - sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), false, errorMessage); + } finally { + sendAckMessage(request.getClass(), peer, request.getPubKeyRing(), request.getOfferId(), request.getUid(), result, errorMessage); } } diff --git a/core/src/main/java/bisq/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java b/core/src/main/java/bisq/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java index 610d7923f3..edecb72ff0 100644 --- a/core/src/main/java/bisq/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java +++ b/core/src/main/java/bisq/core/offer/placeoffer/tasks/MakerSendSignOfferRequest.java @@ -93,6 +93,10 @@ public class MakerSendSignOfferRequest extends Task { private void sendSignOfferRequests(SignOfferRequest request, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { Arbitrator leastUsedArbitrator = DisputeAgentSelection.getLeastUsedArbitrator(model.getTradeStatisticsManager(), model.getArbitratorManager()); + if (leastUsedArbitrator == null) { + errorMessageHandler.handleErrorMessage("Could not get least used arbitrator"); + return; + } sendSignOfferRequests(request, leastUsedArbitrator.getNodeAddress(), new HashSet(), resultHandler, errorMessageHandler); } @@ -140,7 +144,7 @@ public class MakerSendSignOfferRequest extends Task { // get registered arbitrator Arbitrator arbitrator = model.getUser().getAcceptedArbitratorByAddress(arbitratorNodeAddress); - if (arbitrator == null) throw new RuntimeException("Node address " + arbitratorNodeAddress + " is not a registered arbitrator"); + if (arbitrator == null) throw new RuntimeException("Node address " + arbitratorNodeAddress + " is not a registered arbitrator"); // TODO: use error handler request.getOfferPayload().setArbitratorSigner(arbitratorNodeAddress); // send request to arbitrator diff --git a/core/src/main/java/bisq/core/trade/protocol/ArbitratorProtocol.java b/core/src/main/java/bisq/core/trade/protocol/ArbitratorProtocol.java index 60768075c4..3cb1085570 100644 --- a/core/src/main/java/bisq/core/trade/protocol/ArbitratorProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/ArbitratorProtocol.java @@ -51,30 +51,32 @@ public class ArbitratorProtocol extends DisputeProtocol { public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println("ArbitratorProtocol.handleInitTradeRequest()"); - synchronized (trade) { - latchTrade(); - this.errorMessageHandler = errorMessageHandler; - processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set - expect(phase(Trade.Phase.INIT) - .with(message) - .from(peer)) - .setup(tasks( - ApplyFilter.class, - ProcessInitTradeRequest.class, - ArbitratorProcessReserveTx.class, - ArbitratorSendInitTradeOrMultisigRequests.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - handleTaskRunnerFault(peer, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + this.errorMessageHandler = errorMessageHandler; + processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set + expect(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( + ApplyFilter.class, + ProcessInitTradeRequest.class, + ArbitratorProcessReserveTx.class, + ArbitratorSendInitTradeOrMultisigRequests.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + handleTaskRunnerFault(peer, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override @@ -84,30 +86,32 @@ public class ArbitratorProtocol extends DisputeProtocol { public void handleDepositRequest(DepositRequest request, NodeAddress sender) { System.out.println("ArbitratorProtocol.handleDepositRequest() " + trade.getId()); - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), request); - processModel.setTradeMessage(request); - expect(phase(Trade.Phase.INIT) - .with(request) - .from(sender)) - .setup(tasks( - ArbitratorProcessDepositRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - if (trade.getState() == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) { - stopTimeout(); - this.errorMessageHandler = null; - } - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), request); + processModel.setTradeMessage(request); + expect(phase(Trade.Phase.INIT) + .with(request) + .from(sender)) + .setup(tasks( + ArbitratorProcessDepositRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + if (trade.getState() == Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS) { + stopTimeout(); + this.errorMessageHandler = null; + } + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override @@ -117,27 +121,29 @@ public class ArbitratorProtocol extends DisputeProtocol { public void handlePaymentAccountKeyRequest(PaymentAccountKeyRequest request, NodeAddress sender) { System.out.println("ArbitratorProtocol.handlePaymentAccountKeyRequest() " + trade.getId()); - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), request); - processModel.setTradeMessage(request); - expect(new Condition(trade) - .with(request) - .from(sender)) - .setup(tasks( - ArbitratorProcessPaymentAccountKeyRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), request); + processModel.setTradeMessage(request); + expect(new Condition(trade) + .with(request) + .from(sender)) + .setup(tasks( + ArbitratorProcessPaymentAccountKeyRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } protected void handle(PayoutTxPublishedMessage request, NodeAddress peer) { diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java index a868e0f278..8f0ca1e999 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java @@ -54,29 +54,31 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); - synchronized (trade) { - latchTrade(); - this.errorMessageHandler = errorMessageHandler; - expect(phase(Trade.Phase.INIT) - .with(message) - .from(peer)) - .setup(tasks( - ProcessInitTradeRequest.class, - //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here - //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee - MakerSendInitTradeRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - handleTaskRunnerFault(peer, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + this.errorMessageHandler = errorMessageHandler; + expect(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( + ProcessInitTradeRequest.class, + //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here + //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee + MakerSendInitTradeRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + handleTaskRunnerFault(peer, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java index c8a382bde1..fb19fba442 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java @@ -65,29 +65,31 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol public void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".onTakeOffer()"); - synchronized (trade) { - latchTrade(); - this.tradeResultHandler = tradeResultHandler; - this.errorMessageHandler = errorMessageHandler; - expect(phase(Trade.Phase.INIT) - .with(TakerEvent.TAKE_OFFER) - .from(trade.getTradingPeerNodeAddress())) - .setup(tasks( - ApplyFilter.class, - TakerReserveTradeFunds.class, - TakerSendInitTradeRequestToArbitrator.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - unlatchTrade(); - }, - errorMessage -> { - handleError(errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + this.tradeResultHandler = tradeResultHandler; + this.errorMessageHandler = errorMessageHandler; + expect(phase(Trade.Phase.INIT) + .with(TakerEvent.TAKE_OFFER) + .from(trade.getTradingPeerNodeAddress())) + .setup(tasks( + ApplyFilter.class, + TakerReserveTradeFunds.class, + TakerSendInitTradeRequestToArbitrator.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + unlatchTrade(); + }, + errorMessage -> { + handleError(errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java index 5c42b11f2d..3debcc3d96 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java @@ -55,29 +55,31 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc NodeAddress peer, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".handleInitTradeRequest()"); - synchronized (trade) { - latchTrade(); - this.errorMessageHandler = errorMessageHandler; - expect(phase(Trade.Phase.INIT) - .with(message) - .from(peer)) - .setup(tasks( - ProcessInitTradeRequest.class, - //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here - //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee - MakerSendInitTradeRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(peer, message); - }, - errorMessage -> { - handleTaskRunnerFault(peer, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + this.errorMessageHandler = errorMessageHandler; + expect(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( + ProcessInitTradeRequest.class, + //ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here + //VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee + MakerSendInitTradeRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(peer, message); + }, + errorMessage -> { + handleTaskRunnerFault(peer, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java index 74eac8eb94..3b4f9a5e39 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java @@ -63,29 +63,31 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc public void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) { System.out.println(getClass().getCanonicalName() + ".onTakeOffer()"); - synchronized (trade) { - latchTrade(); - this.tradeResultHandler = tradeResultHandler; - this.errorMessageHandler = errorMessageHandler; - expect(phase(Trade.Phase.INIT) - .with(TakerEvent.TAKE_OFFER) - .from(trade.getTradingPeerNodeAddress())) - .setup(tasks( - ApplyFilter.class, - TakerReserveTradeFunds.class, - TakerSendInitTradeRequestToArbitrator.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - unlatchTrade(); - }, - errorMessage -> { - handleError(errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + this.tradeResultHandler = tradeResultHandler; + this.errorMessageHandler = errorMessageHandler; + expect(phase(Trade.Phase.INIT) + .with(TakerEvent.TAKE_OFFER) + .from(trade.getTradingPeerNodeAddress())) + .setup(tasks( + ApplyFilter.class, + TakerReserveTradeFunds.class, + TakerSendInitTradeRequestToArbitrator.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + unlatchTrade(); + }, + errorMessage -> { + handleError(errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } @Override diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 36e5cb9ae6..a7bb79c489 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -229,125 +229,133 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { System.out.println(getClass().getCanonicalName() + ".handleInitMultisigRequest()"); - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), request); - processModel.setTradeMessage(request); - expect(anyPhase(Trade.Phase.INIT) - .with(request) - .from(sender)) - .setup(tasks( - ProcessInitMultisigRequest.class, - MaybeSendSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, request); - }, - errorMessage -> { - handleTaskRunnerFault(sender, request, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), request); + processModel.setTradeMessage(request); + expect(anyPhase(Trade.Phase.INIT) + .with(request) + .from(sender)) + .setup(tasks( + ProcessInitMultisigRequest.class, + MaybeSendSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, request); + }, + errorMessage -> { + handleTaskRunnerFault(sender, request, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { System.out.println(getClass().getCanonicalName() + ".handleSignContractRequest() " + trade.getId()); - synchronized (trade) { - Validator.checkTradeId(processModel.getOfferId(), message); - if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { - latchTrade(); + new Thread(() -> { + synchronized (trade) { Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractRequest.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - // process sign contract request after multisig created - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock - }); + if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); + expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractRequest.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + // process sign contract request after multisig created + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock + }); + } } - } + }).start(); } public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { System.out.println(getClass().getCanonicalName() + ".handleSignContractResponse() " + trade.getId()); - synchronized (trade) { - Validator.checkTradeId(processModel.getOfferId(), message); - if (trade.getState() == Trade.State.CONTRACT_SIGNED) { - latchTrade(); + new Thread(() -> { + synchronized (trade) { Validator.checkTradeId(processModel.getOfferId(), message); - processModel.setTradeMessage(message); - expect(state(Trade.State.CONTRACT_SIGNED) - .with(message) - .from(sender)) - .setup(tasks( - // TODO (woodser): validate request - ProcessSignContractResponse.class, - MakerRemoveOpenOffer.class) - .using(new TradeTaskRunner(trade, - () -> { - startTimeout(TRADE_TIMEOUT); - handleTaskRunnerSuccess(sender, message); - }, - errorMessage -> { - handleTaskRunnerFault(sender, message, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) // extend timeout - .executeTasks(true); - awaitTradeLatch(); - } else { - // process sign contract response after contract signed - EasyBind.subscribe(trade.stateProperty(), state -> { - if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock - }); + if (trade.getState() == Trade.State.CONTRACT_SIGNED) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), message); + processModel.setTradeMessage(message); + expect(state(Trade.State.CONTRACT_SIGNED) + .with(message) + .from(sender)) + .setup(tasks( + // TODO (woodser): validate request + ProcessSignContractResponse.class, + MakerRemoveOpenOffer.class) + .using(new TradeTaskRunner(trade, + () -> { + startTimeout(TRADE_TIMEOUT); + handleTaskRunnerSuccess(sender, message); + }, + errorMessage -> { + handleTaskRunnerFault(sender, message, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) // extend timeout + .executeTasks(true); + awaitTradeLatch(); + } else { + // process sign contract response after contract signed + EasyBind.subscribe(trade.stateProperty(), state -> { + if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock + }); + } } - } + }).start(); } public void handleDepositResponse(DepositResponse response, NodeAddress sender) { System.out.println(getClass().getCanonicalName() + ".handleDepositResponse()"); - synchronized (trade) { - latchTrade(); - Validator.checkTradeId(processModel.getOfferId(), response); - processModel.setTradeMessage(response); - expect(state(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) - .with(response) - .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() - .setup(tasks( - // TODO (woodser): validate request - ProcessDepositResponse.class) - .using(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - this.errorMessageHandler = null; - handleTaskRunnerSuccess(sender, response); - if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized - }, - errorMessage -> { - handleTaskRunnerFault(sender, response, errorMessage); - })) - .withTimeout(TRADE_TIMEOUT)) - .executeTasks(true); - awaitTradeLatch(); - } + new Thread(() -> { + synchronized (trade) { + latchTrade(); + Validator.checkTradeId(processModel.getOfferId(), response); + processModel.setTradeMessage(response); + expect(state(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST) + .with(response) + .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() + .setup(tasks( + // TODO (woodser): validate request + ProcessDepositResponse.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + this.errorMessageHandler = null; + handleTaskRunnerSuccess(sender, response); + if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized + }, + errorMessage -> { + handleTaskRunnerFault(sender, response, errorMessage); + })) + .withTimeout(TRADE_TIMEOUT)) + .executeTasks(true); + awaitTradeLatch(); + } + }).start(); } // TODO (woodser): update to use fluent for consistency diff --git a/desktop/src/main/java/bisq/desktop/main/market/offerbook/OfferBookChartViewModel.java b/desktop/src/main/java/bisq/desktop/main/market/offerbook/OfferBookChartViewModel.java index dd3bd76773..9307c099f2 100644 --- a/desktop/src/main/java/bisq/desktop/main/market/offerbook/OfferBookChartViewModel.java +++ b/desktop/src/main/java/bisq/desktop/main/market/offerbook/OfferBookChartViewModel.java @@ -42,6 +42,7 @@ import bisq.core.offer.Offer; import bisq.core.offer.OfferDirection; import bisq.core.offer.OpenOfferManager; import bisq.core.provider.price.PriceFeedService; +import bisq.core.trade.TradeUtils; import bisq.core.user.Preferences; import bisq.core.util.VolumeUtil; @@ -66,6 +67,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; class OfferBookChartViewModel extends ActivatableViewModel { @@ -125,19 +127,17 @@ class OfferBookChartViewModel extends ActivatableViewModel { offerBookListItems = offerBook.getOfferBookListItems(); offerBookListItemsListener = c -> { - UserThread.execute(() -> { - c.next(); - if (c.wasAdded() || c.wasRemoved()) { - ArrayList list = new ArrayList<>(c.getRemoved()); - list.addAll(c.getAddedSubList()); - if (list.stream() - .map(OfferBookListItem::getOffer) - .anyMatch(e -> e.getCurrencyCode().equals(selectedTradeCurrencyProperty.get().getCode()))) - updateChartData(); - } + c.next(); + if (c.wasAdded() || c.wasRemoved()) { + ArrayList list = new ArrayList<>(c.getRemoved()); + list.addAll(c.getAddedSubList()); + if (list.stream() + .map(OfferBookListItem::getOffer) + .anyMatch(e -> e.getCurrencyCode().equals(selectedTradeCurrencyProperty.get().getCode()))) + updateChartData(); + } - fillTradeCurrencies(); - }); + fillTradeCurrencies(); }; currenciesUpdatedListener = (observable, oldValue, newValue) -> { diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index ca16a6b0e4..6b4f8bab4d 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -432,18 +432,12 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { // Only receive non - CloseConnectionMessage network_messages @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { - Connection that = this; - connectionThreadPool.submit(new Runnable() { - @Override - public void run() { - checkArgument(connection.equals(that)); - if (networkEnvelope instanceof BundleOfEnvelopes) { - onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection); - } else { - messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)); - } - } - }); + checkArgument(connection.equals(this)); + if (networkEnvelope instanceof BundleOfEnvelopes) { + onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection); + } else { + UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); + } } private void onBundleOfEnvelopes(BundleOfEnvelopes bundleOfEnvelopes, Connection connection) { @@ -475,8 +469,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { envelopesToProcess.add(networkEnvelope); } } - envelopesToProcess.forEach(envelope -> - messageListeners.forEach(listener -> listener.onMessage(envelope, connection))); + envelopesToProcess.forEach(envelope -> UserThread.execute(() -> + messageListeners.forEach(listener -> listener.onMessage(envelope, connection)))); }