improve error handling in protocol pipelines

support getTrades() from grpc api
consistently use timeouts in protocol pipelines
remove trade and repost offer on protocol errors
delete multisig wallet when trade removed
protocol advances on ack messages instead of network onArrived()
re-order protocol class methods to correct order
This commit is contained in:
woodser 2021-12-14 12:43:45 -05:00
parent d2ddfad5bc
commit 7c9c35b1b8
21 changed files with 1096 additions and 930 deletions

View File

@ -39,6 +39,7 @@ import bisq.proto.grpc.GetPaymentAccountFormRequest;
import bisq.proto.grpc.GetPaymentAccountsRequest;
import bisq.proto.grpc.GetPaymentMethodsRequest;
import bisq.proto.grpc.GetTradeRequest;
import bisq.proto.grpc.GetTradesRequest;
import bisq.proto.grpc.GetTransactionRequest;
import bisq.proto.grpc.GetTxFeeRateRequest;
import bisq.proto.grpc.GetVersionRequest;
@ -349,6 +350,11 @@ public final class GrpcClient {
return grpcStubs.tradesService.getTrade(request).getTrade();
}
public List<TradeInfo> getTrades() {
var request = GetTradesRequest.newBuilder().build();
return grpcStubs.tradesService.getTrades(request).getTradesList();
}
public void confirmPaymentStarted(String tradeId) {
var request = ConfirmPaymentStartedRequest.newBuilder()
.setTradeId(tradeId)

View File

@ -244,11 +244,15 @@ public class CoreApi {
String paymentAccountId,
Consumer<Trade> resultHandler,
ErrorMessageHandler errorMessageHandler) {
Offer offer = coreOffersService.getOffer(offerId);
coreTradesService.takeOffer(offer,
paymentAccountId,
resultHandler,
errorMessageHandler);
try {
Offer offer = coreOffersService.getOffer(offerId);
coreTradesService.takeOffer(offer,
paymentAccountId,
resultHandler,
errorMessageHandler);
} catch (Exception e) {
errorMessageHandler.handleErrorMessage(e.getMessage());
}
}
public void confirmPaymentStarted(String tradeId) {
@ -271,6 +275,10 @@ public class CoreApi {
return coreTradesService.getTrade(tradeId);
}
public List<Trade> getTrades() {
return coreTradesService.getTrades();
}
public String getTradeRole(String tradeId) {
return coreTradesService.getTradeRole(tradeId);
}

View File

@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -142,20 +143,31 @@ class CoreOffersService {
}
List<Offer> getMyOffers(String direction, String currencyCode) {
// get my offers posted to books
List<Offer> offers = offerBookService.getOffers().stream()
.filter(o -> o.isMyOffer(keyRing))
.filter(o -> offerMatchesDirectionAndCurrency(o, direction, currencyCode))
.sorted(priceComparator(direction))
.collect(Collectors.toList());
// remove unreserved offers
Set<Offer> unreservedOffers = getUnreservedOffers(offers);
offers.removeAll(unreservedOffers);
// remove my unreserved offers from offer manager
List<OpenOffer> unreservedOpenOffers = new ArrayList<OpenOffer>();
for (Offer unreservedOffer : unreservedOffers) {
unreservedOpenOffers.add(openOfferManager.getOpenOfferById(unreservedOffer.getId()).get());
}
openOfferManager.removeOpenOffers(unreservedOpenOffers, null);
// set offer state
for (Offer offer : offers) {
Optional<OpenOffer> openOffer = openOfferManager.getOpenOfferById(offer.getId());
if (openOffer.isPresent()) offer.setState(openOffer.get().getState() == OpenOffer.State.AVAILABLE ? Offer.State.AVAILABLE : Offer.State.NOT_AVAILABLE);
}
return offers;
}

View File

@ -38,7 +38,8 @@ import org.bitcoinj.core.Coin;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
@ -89,31 +90,35 @@ class CoreTradesService {
String paymentAccountId,
Consumer<Trade> resultHandler,
ErrorMessageHandler errorMessageHandler) {
coreWalletsService.verifyWalletsAreAvailable();
coreWalletsService.verifyEncryptedWalletIsUnlocked();
try {
coreWalletsService.verifyWalletsAreAvailable();
coreWalletsService.verifyEncryptedWalletIsUnlocked();
var paymentAccount = user.getPaymentAccount(paymentAccountId);
if (paymentAccount == null)
throw new IllegalArgumentException(format("payment account with id '%s' not found", paymentAccountId));
var paymentAccount = user.getPaymentAccount(paymentAccountId);
if (paymentAccount == null)
throw new IllegalArgumentException(format("payment account with id '%s' not found", paymentAccountId));
var useSavingsWallet = true;
//noinspection ConstantConditions
takeOfferModel.initModel(offer, paymentAccount, useSavingsWallet);
log.info("Initiating take {} offer, {}",
offer.isBuyOffer() ? "buy" : "sell",
takeOfferModel);
//noinspection ConstantConditions
tradeManager.onTakeOffer(offer.getAmount(),
takeOfferModel.getTxFeeFromFeeService(),
takeOfferModel.getTakerFee(),
takeOfferModel.getFundsNeededForTrade(),
offer,
paymentAccountId,
useSavingsWallet,
coreContext.isApiUser(),
resultHandler::accept,
errorMessageHandler
);
var useSavingsWallet = true;
//noinspection ConstantConditions
takeOfferModel.initModel(offer, paymentAccount, useSavingsWallet);
log.info("Initiating take {} offer, {}",
offer.isBuyOffer() ? "buy" : "sell",
takeOfferModel);
//noinspection ConstantConditions
tradeManager.onTakeOffer(offer.getAmount(),
takeOfferModel.getTxFeeFromFeeService(),
takeOfferModel.getTakerFee(),
takeOfferModel.getFundsNeededForTrade(),
offer,
paymentAccountId,
useSavingsWallet,
coreContext.isApiUser(),
resultHandler::accept,
errorMessageHandler
);
} catch (Exception e) {
errorMessageHandler.handleErrorMessage(e.getMessage());
}
}
void confirmPaymentStarted(String tradeId) {
@ -224,6 +229,14 @@ class CoreTradesService {
return tradable.filter((t) -> t instanceof Trade).map(value -> (Trade) value);
}
List<Trade> getTrades() {
coreWalletsService.verifyWalletsAreAvailable();
coreWalletsService.verifyEncryptedWalletIsUnlocked();
List<Trade> trades = new ArrayList<Trade>(tradeManager.getTrades());
trades.addAll(closedTradableManager.getClosedTrades());
return trades;
}
private boolean isFollowingBuyerProtocol(Trade trade) {
return tradeManager.getTradeProtocol(trade) instanceof BuyerProtocol;
}

View File

@ -90,12 +90,14 @@ public class MoneroWalletRpcManager {
* Stop an instance of monero-wallet-rpc.
*
* @param walletRpc the client connected to the monero-wallet-rpc instance to stop
* @param save specifies if the wallet should be saved before closing
*/
public void stopInstance(MoneroWalletRpc walletRpc) {
public void stopInstance(MoneroWalletRpc walletRpc, boolean save) {
boolean found = false;
for (Map.Entry<Integer, MoneroWalletRpc> entry : registeredPorts.entrySet()) {
if (walletRpc == entry.getValue()) {
walletRpc.stop();
walletRpc.close(save);
walletRpc.stopProcess();
found = true;
try { unregisterPort(entry.getKey()); }
catch (Exception e) { throw new MoneroError(e); }

View File

@ -88,8 +88,6 @@ import static bisq.common.util.Preconditions.checkDir;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import monero.common.MoneroUtils;
import monero.daemon.MoneroDaemon;
import monero.daemon.MoneroDaemonRpc;
@ -132,7 +130,8 @@ public class WalletConfig extends AbstractIdleService {
private static final String MONERO_DAEMON_USERNAME = "superuser";
private static final String MONERO_DAEMON_PASSWORD = "abctesting123";
private static final MoneroWalletRpcManager MONERO_WALLET_RPC_MANAGER = new MoneroWalletRpcManager();
private static final String MONERO_WALLET_RPC_PATH = System.getProperty("user.dir") + File.separator + ".localnet" + File.separator + "monero-wallet-rpc"; // use wallet rpc in .localnet
private static final String MONERO_WALLET_RPC_DIR = System.getProperty("user.dir") + File.separator + ".localnet"; // .localnet contains monero-wallet-rpc and wallet files
private static final String MONERO_WALLET_RPC_PATH = MONERO_WALLET_RPC_DIR + File.separator + "monero-wallet-rpc";
private static final String MONERO_WALLET_RPC_USERNAME = "rpc_user";
private static final String MONERO_WALLET_RPC_PASSWORD = "abc123";
private static final long MONERO_WALLET_SYNC_RATE = 5000l;
@ -292,6 +291,11 @@ public class WalletConfig extends AbstractIdleService {
// Meant to be overridden by subclasses
}
public boolean walletExists(String walletName) {
String path = directory.toString() + File.separator + walletName;
return new File(path + ".keys").exists();
}
public MoneroWalletRpc createWallet(MoneroWalletConfig config, Integer port) {
// start monero-wallet-rpc instance
@ -304,7 +308,7 @@ public class WalletConfig extends AbstractIdleService {
return walletRpc;
} catch (Exception e) {
e.printStackTrace();
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance(walletRpc);
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance(walletRpc, false);
throw e;
}
}
@ -321,7 +325,7 @@ public class WalletConfig extends AbstractIdleService {
return walletRpc;
} catch (Exception e) {
e.printStackTrace();
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance(walletRpc);
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance(walletRpc, false);
throw e;
}
}
@ -347,8 +351,17 @@ public class WalletConfig extends AbstractIdleService {
return WalletConfig.MONERO_WALLET_RPC_MANAGER.startInstance(cmd);
}
public void closeWallet(MoneroWallet walletRpc) {
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance((MoneroWalletRpc) walletRpc);
public void closeWallet(MoneroWallet walletRpc, boolean save) {
WalletConfig.MONERO_WALLET_RPC_MANAGER.stopInstance((MoneroWalletRpc) walletRpc, save);
}
public void deleteWallet(String walletName) {
if (!walletExists(walletName)) throw new Error("Wallet does not exist at path: " + walletName);
String path = directory.toString() + File.separator + walletName;
if (!new File(path).delete()) throw new RuntimeException("Failed to delete wallet file: " + path);
if (!new File(path + ".keys").delete()) throw new RuntimeException("Failed to delete wallet file: " + path);
if (!new File(path + ".address.txt").delete()) throw new RuntimeException("Failed to delete wallet file: " + path);
//WalletsSetup.deleteRollingBackup(walletName); // TODO (woodser): necessary to delete rolling backup?
}
@Override

View File

@ -86,7 +86,7 @@ public class XmrWalletService {
// TODO (woodser): wallet has single password which is passed here?
// TODO (woodser): test retaking failed trade. create new multisig wallet or replace? cannot reuse
public MoneroWallet createMultisigWallet(String tradeId) {
public synchronized MoneroWallet createMultisigWallet(String tradeId) {
if (multisigWallets.containsKey(tradeId)) return multisigWallets.get(tradeId);
String path = "xmr_multisig_trade_" + tradeId;
MoneroWallet multisigWallet = null;
@ -99,7 +99,7 @@ public class XmrWalletService {
return multisigWallet;
}
public MoneroWallet getMultisigWallet(String tradeId) {
public synchronized MoneroWallet getMultisigWallet(String tradeId) {
if (multisigWallets.containsKey(tradeId)) return multisigWallets.get(tradeId);
String path = "xmr_multisig_trade_" + tradeId;
MoneroWallet multisigWallet = null;
@ -111,6 +111,19 @@ public class XmrWalletService {
multisigWallet.startSyncing(5000l); // TODO (woodser): use sync period from config. apps stall if too many multisig wallets and too short sync period
return multisigWallet;
}
public synchronized boolean deleteMultisigWallet(String tradeId) {
String walletName = "xmr_multisig_trade_" + tradeId;
if (!walletsSetup.getWalletConfig().walletExists(walletName)) return false;
try {
walletsSetup.getWalletConfig().closeWallet(getMultisigWallet(tradeId), false);
} catch (Exception err) {
// multisig wallet may not be open
}
walletsSetup.getWalletConfig().deleteWallet(walletName);
multisigWallets.remove(tradeId);
return true;
}
public XmrAddressEntry recoverAddressEntry(String offerId, String address, XmrAddressEntry.Context context) {
var available = findAddressEntry(address, XmrAddressEntry.Context.AVAILABLE);
@ -291,7 +304,7 @@ public class XmrWalletService {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
try { walletsSetup.getWalletConfig().closeWallet(openWallet); }
try { walletsSetup.getWalletConfig().closeWallet(openWallet, true); }
catch (Exception e) {
log.warn("Error closing monero-wallet-rpc subprocess. Was Haveno stopped manually with ctrl+c?");
}

View File

@ -391,11 +391,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
offer.getAmount(),
buyerSecurityDeposit,
createOfferService.getSellerSecurityDepositAsDouble(buyerSecurityDeposit));
if (placeOfferProtocols.containsKey(offer.getId())) {
log.warn("We already have a place offer protocol for offer " + offer.getId() + ", ignoring");
throw new RuntimeException("We already have a place offer protocol for offer " + offer.getId() + ", ignoring");
}
PlaceOfferModel model = new PlaceOfferModel(offer,
reservedFundsForOffer,
@ -596,6 +591,10 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
requestPersistence();
}
public void unreserveOpenOffer(OpenOffer openOffer) {
openOffer.setState(OpenOffer.State.AVAILABLE);
requestPersistence();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters

View File

@ -64,7 +64,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.TorNetworkNode;
import com.google.common.collect.ImmutableList;
import bisq.common.ClockWatcher;
import bisq.common.config.Config;
import bisq.common.crypto.KeyRing;
@ -445,8 +445,8 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
((ArbitratorProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null)
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage); // TODO (woodser): separate handler? // TODO (woodser): ensure failed trade removed
log.warn("Arbitrator error during trade initialization: " + errorMessage);
removeTrade(trade);
});
requestPersistence();
@ -473,7 +473,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
return;
}
openOfferManager.reserveOpenOffer(openOffer); // TODO (woodser): reserve offer if arbitrator?
openOfferManager.reserveOpenOffer(openOffer); // TODO (woodser): reserve offer if arbitrator? probably. or, arbitrator does not have open offer?
Trade trade;
if (offer.isBuyOffer())
@ -511,10 +511,11 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
trade.getSelf().setReserveTxKeyImages(offer.getOfferPayload().getReserveTxKeyImages());
tradableList.add(trade);
((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
((MakerProtocol) getTradeProtocol(trade)).handleInitTradeRequest(request, sender, errorMessage -> {
log.warn("Maker error during trade initialization: " + errorMessage);
openOfferManager.unreserveOpenOffer(openOffer); // offer remains available // TODO: only unreserve if funds not deposited to multisig
removeTrade(trade);
if (takeOfferRequestErrorMessageHandler != null) takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
});
requestPersistence();
@ -532,13 +533,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(request.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + request.getTradeId());
return;
}
Trade trade = tradeOptional.get();
getTradeProtocol(trade).handleInitMultisigRequest(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
getTradeProtocol(trade).handleInitMultisigRequest(request, peer);
}
private void handleSignContractRequest(SignContractRequest request, NodeAddress peer) {
@ -552,13 +552,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(request.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + request.getTradeId());
return;
}
Trade trade = tradeOptional.get();
getTradeProtocol(trade).handleSignContractRequest(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
getTradeProtocol(trade).handleSignContractRequest(request, peer);
}
private void handleSignContractResponse(SignContractResponse request, NodeAddress peer) {
@ -572,13 +571,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(request.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + request.getTradeId());
return;
}
Trade trade = tradeOptional.get();
((TraderProtocol) getTradeProtocol(trade)).handleSignContractResponse(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
((TraderProtocol) getTradeProtocol(trade)).handleSignContractResponse(request, peer);
}
private void handleDepositRequest(DepositRequest request, NodeAddress peer) {
@ -592,13 +590,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(request.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + request.getTradeId());
return;
}
Trade trade = tradeOptional.get();
((ArbitratorProtocol) getTradeProtocol(trade)).handleDepositRequest(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
((ArbitratorProtocol) getTradeProtocol(trade)).handleDepositRequest(request, peer);
}
private void handleDepositResponse(DepositResponse response, NodeAddress peer) {
@ -612,13 +609,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(response.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + response.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + response.getTradeId());
return;
}
Trade trade = tradeOptional.get();
((TraderProtocol) getTradeProtocol(trade)).handleDepositResponse(response, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
((TraderProtocol) getTradeProtocol(trade)).handleDepositResponse(response, peer);
}
private void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress peer) {
@ -632,13 +628,12 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
}
Optional<Trade> tradeOptional = getTradeById(request.getTradeId());
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
if (!tradeOptional.isPresent()) {
log.warn("No trade with id " + request.getTradeId());
return;
}
Trade trade = tradeOptional.get();
((TraderProtocol) getTradeProtocol(trade)).handlePaymentAccountPayloadRequest(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null) {
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
}
});
((TraderProtocol) getTradeProtocol(trade)).handlePaymentAccountPayloadRequest(request, peer);
}
private void handleUpdateMultisigRequest(UpdateMultisigRequest request, NodeAddress peer) {
@ -655,8 +650,9 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
if (!tradeOptional.isPresent()) throw new RuntimeException("No trade with id " + request.getTradeId()); // TODO (woodser): error handling
Trade trade = tradeOptional.get();
getTradeProtocol(trade).handleUpdateMultisigRequest(request, peer, errorMessage -> {
if (takeOfferRequestErrorMessageHandler != null)
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
log.warn("Error handling UpdateMultisigRequest: " + errorMessage);
if (takeOfferRequestErrorMessageHandler != null)
takeOfferRequestErrorMessageHandler.handleErrorMessage(errorMessage);
});
}
@ -735,7 +731,9 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
// take offer and persist trade on success
((TakerProtocol) tradeProtocol).onTakeOffer(result -> {
tradeResultHandler.handleResult(trade);
requestPersistence();
}, errorMessage -> {
log.warn("Taker error during trade initialization: " + errorMessage);
removeTrade(trade);
errorMessageHandler.handleErrorMessage(errorMessage);
});
@ -985,8 +983,25 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
return tradableList.stream().filter(e -> e.getId().equals(tradeId)).findFirst();
}
public List<Trade> getTrades() {
return ImmutableList.copyOf(getObservableList().stream()
.filter(e -> e instanceof Trade)
.map(e -> e)
.collect(Collectors.toList()));
}
private void removeTrade(Trade trade) {
if (tradableList.remove(trade)) {
// unreserve taker trade key images
if (trade instanceof TakerTrade && trade.getSelf().getReserveTxKeyImages() != null) {
for (String keyImage : trade.getSelf().getReserveTxKeyImages()) {
xmrWalletService.getWallet().thawOutput(keyImage);
}
}
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
xmrWalletService.deleteMultisigWallet(trade.getId());
requestPersistence();
}
}

View File

@ -31,22 +31,34 @@ public class ArbitratorProtocol extends DisputeProtocol {
// Incoming messages
///////////////////////////////////////////////////////////////////////////////////////////
public void handleInitTradeRequest(InitTradeRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { // TODO (woodser): update impl to use errorMessageHandler
public void handleInitTradeRequest(InitTradeRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
this.errorMessageHandler = errorMessageHandler;
processModel.setTradeMessage(message); // TODO (woodser): confirm these are null without being set
//processModel.setTempTradingPeerNodeAddress(peer);
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ApplyFilter.class,
ProcessInitTradeRequest.class,
ArbitratorProcessesReserveTx.class,
ArbitratorSendsInitTradeAndMultisigRequests.class))
ApplyFilter.class,
ProcessInitTradeRequest.class,
ArbitratorProcessesReserveTx.class,
ArbitratorSendsInitTradeAndMultisigRequests.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
@ -54,20 +66,21 @@ public class ArbitratorProtocol extends DisputeProtocol {
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class)
.using(new TradeTaskRunner(trade,
ProcessInitMultisigRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
@ -77,18 +90,19 @@ public class ArbitratorProtocol extends DisputeProtocol {
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
}))
.withTimeout(30))
.executeTasks();
}
public void handleDepositRequest(DepositRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
public void handleDepositRequest(DepositRequest request, NodeAddress sender) {
System.out.println("ArbitratorProtocol.handleDepositRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
@ -96,15 +110,16 @@ public class ArbitratorProtocol extends DisputeProtocol {
.with(request)
.from(sender))
.setup(tasks(
ProcessDepositRequest.class)
.using(new TradeTaskRunner(trade,
ProcessDepositRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
}))
.withTimeout(30))
.executeTasks();
}

View File

@ -64,11 +64,179 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
super(trade);
}
///////////////////////////////////////////////////////////////////////////////////////////
// MakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with SellerAsMakerProtocol due to single inheritance
@Override
public void handleInitTradeRequest(InitTradeRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ProcessInitTradeRequest.class,
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
MakerSendsInitTradeRequestIfUnreserved.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class,
MakerRemovesOpenOffer.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Handle take offer request
// User interaction
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which events we expect
@Override
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
super.onPaymentStarted(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message Payout tx
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
super.handle(message, peer);
}
@Override
protected Class<? extends TradeTask> getVerifyPeersFeePaymentClass() {
return MakerVerifyTakerFeePayment.class;
}
// TODO (woodser): remove or ignore any unsupported requests
///////////////////////////////////////////////////////////////////////////////////////////
@ -96,175 +264,4 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
protected void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) {
super.handle(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// User interaction
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which events we expect
@Override
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
super.onPaymentStarted(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message Payout tx
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
super.handle(message, peer);
}
@Override
protected Class<? extends TradeTask> getVerifyPeersFeePaymentClass() {
return MakerVerifyTakerFeePayment.class;
}
///////////////////////////////////////////////////////////////////////////////////////////
// MakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with SellerAsMakerProtocol due to single inheritance
@Override
public void handleInitTradeRequest(InitTradeRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ProcessInitTradeRequest.class,
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
MakerSendsInitTradeRequestIfUnreserved.class,
MakerRemovesOpenOffer.class).
using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
})))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
}
}

View File

@ -68,8 +68,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol {
private TradeResultHandler tradeResultHandler;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@ -85,7 +83,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// Take offer
// User interaction: Take offer
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): this implementation is duplicated with SellerAsTakerProtocol
@ -93,21 +91,183 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
public void onTakeOffer(TradeResultHandler tradeResultHandler, ErrorMessageHandler errorMessageHandler) {
System.out.println("onTakeOffer()");
this.tradeResultHandler = tradeResultHandler;
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
.setup(tasks(
ApplyFilter.class,
TakerReservesTradeFunds.class,
TakerSendsInitTradeRequestToArbitrator.class) // TODO (woodser): app hangs if this pipeline fails. use .using() like below
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
.setup(tasks(
ApplyFilter.class,
TakerReservesTradeFunds.class,
TakerSendsInitTradeRequestToArbitrator.class) // TODO (woodser): app hangs if this pipeline fails. use .using() like below
.using(new TradeTaskRunner(trade,
() -> { },
errorMessageHandler))
.withTimeout(30))
.executeTasks();
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// TakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with SellerAsTakerProtocol due to single inheritance
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("BuyerAsTakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// User interaction
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which events we expect
@Override
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
super.onPaymentStarted(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message Payout tx
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
super.handle(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Message dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
super.onTradeMessage(message, peer);
if (message instanceof InputsForDepositTxResponse) {
handle((InputsForDepositTxResponse) message, peer);
}
}
@Override
protected Class<? extends TradeTask> getVerifyPeersFeePaymentClass() {
return TakerVerifyMakerFeePayment.class;
}
// TODO (woodser): remove unused
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages Take offer process
@ -149,167 +309,4 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
protected void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) {
super.handle(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// User interaction
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which events we expect
@Override
public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
super.onPaymentStarted(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message Payout tx
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
super.handle(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Message dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
@Override
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
super.onTradeMessage(message, peer);
if (message instanceof InputsForDepositTxResponse) {
handle((InputsForDepositTxResponse) message, peer);
}
}
@Override
protected Class<? extends TradeTask> getVerifyPeersFeePaymentClass() {
return TakerVerifyMakerFeePayment.class;
}
///////////////////////////////////////////////////////////////////////////////////////////
// TakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with SellerAsTakerProtocol due to single inheritance
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsTakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
System.out.println("handle multisig pipeline completed successfully!");
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
System.out.println("error in handle multisig pipeline!!!: " + errorMessage);
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
})))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
}
}

View File

@ -64,37 +64,153 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
super(trade);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages Take offer process
// MakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with BuyerAsMakerProtocol due to single inheritance
protected void handle(DepositTxMessage message, NodeAddress peer) {
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
@Override
public void handleInitTradeRequest(InitTradeRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
MakerRemovesOpenOffer.class,
SellerAsMakerProcessDepositTxMessage.class,
SellerAsMakerFinalizesDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSignsDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class)
.withTimeout(60))
ProcessInitTradeRequest.class,
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
MakerSendsInitTradeRequestIfUnreserved.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message when buyer has clicked payment started button
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
super.handle(message, peer);
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println("BuyerAsMakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class,
MakerRemovesOpenOffer.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// User interaction
@ -106,7 +222,6 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
super.onPaymentReceived(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Massage dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
@ -128,145 +243,35 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
return MakerVerifyTakerFeePayment.class;
}
///////////////////////////////////////////////////////////////////////////////////////////
// MakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with BuyerAsMakerProtocol due to single inheritance
// TODO (woodser): remove unused
@Override
public void handleInitTradeRequest(InitTradeRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
ProcessInitTradeRequest.class,
//ApplyFilter.class, // TODO (woodser): these checks apply when maker signs availability request, but not here
//VerifyPeersAccountAgeWitness.class, // TODO (woodser): these checks apply after in multisig, means if rejected need to reimburse other's fee
MakerSendsInitTradeRequestIfUnreserved.class,
MakerRemovesOpenOffer.class).
using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(peer, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages Take offer process
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DepositTxMessage message, NodeAddress peer) {
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(
MakerRemovesOpenOffer.class,
SellerAsMakerProcessDepositTxMessage.class,
SellerAsMakerFinalizesDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSignsDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class)
.withTimeout(60))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message when buyer has clicked payment started button
///////////////////////////////////////////////////////////////////////////////////////////
// We keep the handler here in as well to make it more transparent which messages we expect
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message); // TODO (woodser): synchronize access since concurrent requests processed
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
})))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsMakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
super.handle(message, peer);
}
}

View File

@ -63,8 +63,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtocol {
private TradeResultHandler tradeResultHandler;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@ -86,41 +84,141 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
ErrorMessageHandler errorMessageHandler) {
System.out.println("onTakeOffer()");
this.tradeResultHandler = tradeResultHandler;
this.errorMessageHandler = errorMessageHandler;
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
.setup(tasks(
ApplyFilter.class,
TakerReservesTradeFunds.class,
TakerSendsInitTradeRequestToArbitrator.class)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
.setup(tasks(
ApplyFilter.class,
TakerReservesTradeFunds.class,
TakerSendsInitTradeRequestToArbitrator.class)
.using(new TradeTaskRunner(trade,
() -> { },
errorMessageHandler))
.withTimeout(30))
.executeTasks();
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// TakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with BuyerAsTakerProtocol due to single inheritance
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println("BuyerAsTakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
handleTaskRunnerFault(sender, request, errorMessage);
errorMessageHandler.handleErrorMessage(errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming messages Take offer process
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
expect(phase(Trade.Phase.INIT)
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(peer))
.from(sender))
.setup(tasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
//TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
SellerAsTakerSignsDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSignsDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class)
.withTimeout(60))
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender) {
System.out.println("SellerAsTakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED) // TODO: only deposit_published should be expected
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@ -135,7 +233,6 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
super.handle(message, peer);
}
///////////////////////////////////////////////////////////////////////////////////////////
// User interaction
///////////////////////////////////////////////////////////////////////////////////////////
@ -146,7 +243,6 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
super.onPaymentReceived(resultHandler, errorMessageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Massage dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
@ -167,126 +263,28 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
protected Class<? extends TradeTask> getVerifyPeersFeePaymentClass() {
return TakerVerifyMakerFeePayment.class;
}
// TODO (woodser): remove unused calls and classes
///////////////////////////////////////////////////////////////////////////////////////////
// TakerProtocol
// Incoming messages Take offer process
///////////////////////////////////////////////////////////////////////////////////////////
// TODO (woodser): these methods are duplicated with BuyerAsTakerProtocol due to single inheritance
@Override
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("BuyerAsTakerProtocol.handleInitMultisigRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT)
.with(request)
.from(sender))
.setup(tasks(
ProcessInitMultisigRequest.class,
SendSignContractRequestAfterMultisig.class)
.using(new TradeTaskRunner(trade,
() -> {
System.out.println("handle multisig pipeline completed successfully!");
handleTaskRunnerSuccess(sender, request);
},
errorMessage -> {
System.out.println("error in handle multisig pipeline!!!: " + errorMessage);
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleSignContractRequest()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
}
@Override
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleSignContractResponse()");
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
expect(anyPhase(Trade.Phase.INIT)
.with(message)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessSignContractResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, message);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, message, errorMessage);
})))
.executeTasks();
}
@Override
public void handleDepositResponse(DepositResponse response, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handleDepositResponse()");
Validator.checkTradeId(processModel.getOfferId(), response);
processModel.setTradeMessage(response);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED)
.with(response)
.from(sender))
.setup(tasks(
// TODO (woodser): validate request
ProcessDepositResponse.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, response, errorMessage);
})))
.executeTasks();
}
@Override
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress sender, ErrorMessageHandler errorMessageHandler) {
System.out.println("SellerAsTakerProtocol.handlePaymentAccountPayloadRequest()");
Validator.checkTradeId(processModel.getOfferId(), request);
processModel.setTradeMessage(request);
expect(anyPhase(Trade.Phase.INIT, Trade.Phase.DEPOSIT_PUBLISHED) // TODO: only deposit_published should be expected
.with(request)
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.setup(tasks(
// TODO (woodser): validate request
ProcessPaymentAccountPayloadRequest.class)
.using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(sender, request);
tradeResultHandler.handleResult(trade); // trade is initialized
},
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(sender, request, errorMessage);
})))
.executeTasks();
private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
//TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
SellerAsTakerSignsDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSignsDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class)
.withTimeout(60))
.executeTasks();
}
}

View File

@ -20,6 +20,7 @@ package bisq.core.trade.protocol;
import bisq.core.offer.Offer;
import bisq.core.trade.Trade;
import bisq.core.trade.TradeManager;
import bisq.core.trade.handlers.TradeResultHandler;
import bisq.core.trade.messages.CounterCurrencyTransferStartedMessage;
import bisq.core.trade.messages.DepositTxAndDelayedPayoutTxMessage;
import bisq.core.trade.messages.InitMultisigRequest;
@ -60,6 +61,8 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected final ProcessModel processModel;
protected final Trade trade;
private Timer timeoutTimer;
protected TradeResultHandler tradeResultHandler;
protected ErrorMessageHandler errorMessageHandler;
///////////////////////////////////////////////////////////////////////////////////////////
@ -205,8 +208,8 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
///////////////////////////////////////////////////////////////////////////////////////////
protected abstract void onTradeMessage(TradeMessage message, NodeAddress peer);
public abstract void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress peer, ErrorMessageHandler errorMessageHandler);
public abstract void handleSignContractRequest(SignContractRequest request, NodeAddress peer, ErrorMessageHandler errorMessageHandler);
public abstract void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress peer);
public abstract void handleSignContractRequest(SignContractRequest request, NodeAddress peer);
// TODO (woodser): update to use fluent for consistency
public void handleUpdateMultisigRequest(UpdateMultisigRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) {
@ -348,12 +351,10 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected void startTimeout(long timeoutSec) {
stopTimeout();
timeoutTimer = UserThread.runAfter(() -> {
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}",
trade.getId(), trade.stateProperty().get(), timeoutSec);
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}", trade.getId(), trade.stateProperty().get(), timeoutSec);
trade.setErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec.");
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec. TradeID=" + trade.getId() + ", state=" + trade.stateProperty().get());
processModel.getTradeManager().requestPersistence();
cleanup();
}, timeoutSec);

View File

@ -23,10 +23,8 @@ import bisq.core.trade.messages.PaymentAccountPayloadRequest;
import bisq.core.trade.messages.SignContractResponse;
import bisq.network.p2p.NodeAddress;
import bisq.common.handlers.ErrorMessageHandler;
public interface TraderProtocol {
public void handleSignContractResponse(SignContractResponse message, NodeAddress peer, ErrorMessageHandler errorMessageHandler);
public void handleDepositResponse(DepositResponse response, NodeAddress peer, ErrorMessageHandler errorMessageHandler);
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress peer, ErrorMessageHandler errorMessageHandler);
public void handleSignContractResponse(SignContractResponse message, NodeAddress peer);
public void handleDepositResponse(DepositResponse response, NodeAddress peer);
public void handlePaymentAccountPayloadRequest(PaymentAccountPayloadRequest request, NodeAddress peer);
}

View File

@ -22,8 +22,9 @@ import bisq.core.trade.MakerTrade;
import bisq.core.trade.TakerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.messages.InitMultisigRequest;
import bisq.core.trade.protocol.TradeListener;
import bisq.core.trade.protocol.TradingPeer;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.SendDirectMessageListener;
@ -50,6 +51,7 @@ public class ProcessInitMultisigRequest extends TradeTask {
private boolean ack1 = false;
private boolean ack2 = false;
private boolean failed = false;
private static Object lock = new Object();
MoneroWallet multisigWallet;
@ -149,37 +151,31 @@ public class ProcessInitMultisigRequest extends TradeTask {
if (peer2Address == null) throw new RuntimeException("Peer2 address is null");
if (peer2PubKeyRing == null) throw new RuntimeException("Peer2 pub key ring null");
// send to peer 1
sendInitMultisigRequest(peer1Address, peer1PubKeyRing, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), peer1Address, request.getTradeId(), request.getUid());
ack1 = true;
if (ack1 && ack2) completeAux();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), request.getUid(), peer1Address, errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
// complete on successful ack messages
TradeListener ackListener = new TradeListener() {
@Override
public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
if (!ackMessage.getSourceMsgClassName().equals(InitMultisigRequest.class.getSimpleName())) return;
if (ackMessage.isSuccess()) {
if (sender.equals(peer1Address)) ack1 = true;
if (sender.equals(peer2Address)) ack2 = true;
if (ack1 && ack2) {
trade.removeListener(this);
completeAux();
}
} else {
if (!failed) {
failed = true;
failed(ackMessage.getErrorMessage()); // TODO: (woodser): only fail once? build into task?
}
}
}
};
trade.addListener(ackListener);
// send to peer 2
sendInitMultisigRequest(peer2Address, peer2PubKeyRing, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), peer2Address, request.getTradeId(), request.getUid());
ack2 = true;
if (ack1 && ack2) completeAux();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), request.getUid(), peer2Address, errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
// send to peers
sendInitMultisigRequest(peer1Address, peer1PubKeyRing);
sendInitMultisigRequest(peer2Address, peer2PubKeyRing);
} else {
completeAux();
}
@ -204,21 +200,32 @@ public class ProcessInitMultisigRequest extends TradeTask {
return peers;
}
private void sendInitMultisigRequest(NodeAddress recipient, PubKeyRing pubKeyRing, SendDirectMessageListener listener) {
private void sendInitMultisigRequest(NodeAddress recipient, PubKeyRing pubKeyRing) {
// create multisig message with current multisig hex
InitMultisigRequest request = new InitMultisigRequest(
processModel.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
processModel.getPreparedMultisigHex(),
processModel.getMadeMultisigHex());
log.info("Send {} with offerId {} and uid {} to peer {}", request.getClass().getSimpleName(), request.getTradeId(), request.getUid(), recipient);
processModel.getP2PService().sendEncryptedDirectMessage(recipient, pubKeyRing, request, listener);
// create request with current multisig hex
InitMultisigRequest request = new InitMultisigRequest(
processModel.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
processModel.getPreparedMultisigHex(),
processModel.getMadeMultisigHex());
log.info("Send {} with offerId {} and uid {} to peer {}", request.getClass().getSimpleName(), request.getTradeId(), request.getUid(), recipient);
processModel.getP2PService().sendEncryptedDirectMessage(recipient, pubKeyRing, request, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), recipient, request.getTradeId(), request.getUid());
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), request.getUid(), recipient, errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
}
private void completeAux() {

View File

@ -29,7 +29,9 @@ import bisq.core.trade.Trade;
import bisq.core.trade.TradeUtils;
import bisq.core.trade.messages.SignContractRequest;
import bisq.core.trade.messages.SignContractResponse;
import bisq.core.trade.protocol.TradeListener;
import bisq.core.trade.protocol.TradingPeer;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.SendDirectMessageListener;
import java.util.Date;
@ -41,6 +43,7 @@ public class ProcessSignContractRequest extends TradeTask {
private boolean ack1 = false;
private boolean ack2 = false;
private boolean failed = false;
@SuppressWarnings({"unused"})
public ProcessSignContractRequest(TaskRunner taskHandler, Trade trade) {
@ -61,73 +64,84 @@ public class ProcessSignContractRequest extends TradeTask {
trader.setPaymentAccountPayloadHash(request.getPaymentAccountPayloadHash());
trader.setPayoutAddressString(request.getPayoutAddress());
// return contract signature when ready
// sign contract only when both deposit txs hashes known
// TODO (woodser): synchronize contract creation; both requests received at the same time
// TODO (woodser): remove makerDepositTxId and takerDepositTxId from Trade
if (processModel.getMaker().getDepositTxHash() != null && processModel.getTaker().getDepositTxHash() != null) { // TODO (woodser): synchronize on process model before setting hash so response only sent once
// create and sign contract
Contract contract = TradeUtils.createContract(trade);
String contractAsJson = Utilities.objectToJson(contract);
String signature = Sig.sign(processModel.getKeyRing().getSignatureKeyPair().getPrivate(), contractAsJson);
// save contract and signature
trade.setContract(contract);
trade.setContractAsJson(contractAsJson);
trade.getSelf().setContractSignature(signature);
// create response with contract signature
SignContractResponse response = new SignContractResponse(
trade.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
signature);
// get response recipients. only arbitrator sends response to both peers
NodeAddress recipient1 = trade instanceof ArbitratorTrade ? trade.getMakerNodeAddress() : trade.getTradingPeerNodeAddress();
PubKeyRing recipient1PubKey = trade instanceof ArbitratorTrade ? trade.getMakerPubKeyRing() : trade.getTradingPeerPubKeyRing();
NodeAddress recipient2 = trade instanceof ArbitratorTrade ? trade.getTakerNodeAddress() : null;
PubKeyRing recipient2PubKey = trade instanceof ArbitratorTrade ? trade.getTakerPubKeyRing() : null;
// send response to recipient 1
processModel.getP2PService().sendEncryptedDirectMessage(recipient1, recipient1PubKey, response, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", response.getClass().getSimpleName(), recipient1, trade.getId());
ack1 = true;
if (ack1 && (recipient2 == null || ack2)) complete();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", response.getClass().getSimpleName(), recipient1, trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + response + "\nerrorMessage=" + errorMessage);
failed();
}
});
// send response to recipient 2 if applicable
if (recipient2 != null) {
processModel.getP2PService().sendEncryptedDirectMessage(recipient2, recipient2PubKey, response, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", response.getClass().getSimpleName(), recipient2, trade.getId());
ack2 = true;
if (ack1 && ack2) complete();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", response.getClass().getSimpleName(), recipient2, trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + response + "\nerrorMessage=" + errorMessage);
failed();
}
});
}
if (processModel.getMaker().getDepositTxHash() == null || processModel.getTaker().getDepositTxHash() == null) {
complete();
return;
}
// create and sign contract
Contract contract = TradeUtils.createContract(trade);
String contractAsJson = Utilities.objectToJson(contract);
String signature = Sig.sign(processModel.getKeyRing().getSignatureKeyPair().getPrivate(), contractAsJson);
// save contract and signature
trade.setContract(contract);
trade.setContractAsJson(contractAsJson);
trade.getSelf().setContractSignature(signature);
// get response recipients. only arbitrator sends response to both peers
NodeAddress recipient1 = trade instanceof ArbitratorTrade ? trade.getMakerNodeAddress() : trade.getTradingPeerNodeAddress();
PubKeyRing recipient1PubKey = trade instanceof ArbitratorTrade ? trade.getMakerPubKeyRing() : trade.getTradingPeerPubKeyRing();
NodeAddress recipient2 = trade instanceof ArbitratorTrade ? trade.getTakerNodeAddress() : null;
PubKeyRing recipient2PubKey = trade instanceof ArbitratorTrade ? trade.getTakerPubKeyRing() : null;
// complete on successful ack messages
TradeListener ackListener = new TradeListener() {
@Override
public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
if (!ackMessage.getSourceMsgClassName().equals(SignContractResponse.class.getSimpleName())) return;
if (ackMessage.isSuccess()) {
if (sender.equals(trade.getTradingPeerNodeAddress())) ack1 = true;
if (sender.equals(trade.getArbitratorNodeAddress())) ack2 = true;
if (trade instanceof ArbitratorTrade ? ack1 && ack2 : ack1) { // only arbitrator sends response to both peers
trade.removeListener(this);
complete();
}
} else {
if (!failed) {
failed = true;
failed(ackMessage.getErrorMessage()); // TODO: (woodser): only fail once? build into task?
}
}
}
};
trade.addListener(ackListener);
// send contract signature response(s)
if (recipient1 != null) sendSignContractResponse(recipient1, recipient1PubKey, signature);
if (recipient2 != null) sendSignContractResponse(recipient2, recipient2PubKey, signature);
} catch (Throwable t) {
failed(t);
}
}
private void sendSignContractResponse(NodeAddress nodeAddress, PubKeyRing pubKeyRing, String contractSignature) {
// create response with contract signature
SignContractResponse response = new SignContractResponse(
trade.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
contractSignature);
// send request
processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, response, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", response.getClass().getSimpleName(), nodeAddress, trade.getId());
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", response.getClass().getSimpleName(), nodeAddress, trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + response + "\nerrorMessage=" + errorMessage);
failed();
}
});
}
}

View File

@ -18,6 +18,7 @@
package bisq.core.trade.protocol.tasks;
import bisq.common.app.Version;
import bisq.common.crypto.PubKeyRing;
import bisq.common.taskrunner.TaskRunner;
import bisq.core.btc.model.XmrAddressEntry;
import bisq.core.offer.Offer;
@ -26,7 +27,10 @@ import bisq.core.trade.SellerTrade;
import bisq.core.trade.Trade;
import bisq.core.trade.TradeUtils;
import bisq.core.trade.messages.SignContractRequest;
import bisq.core.trade.protocol.TradeListener;
import bisq.core.util.ParsingUtils;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.SendDirectMessageListener;
import java.math.BigInteger;
import java.util.Date;
@ -42,6 +46,7 @@ public class SendSignContractRequestAfterMultisig extends TradeTask {
private boolean ack1 = false;
private boolean ack2 = false;
private boolean failed = false;
@SuppressWarnings({"unused"})
public SendSignContractRequestAfterMultisig(TaskRunner taskHandler, Trade trade) {
@ -54,7 +59,7 @@ public class SendSignContractRequestAfterMultisig extends TradeTask {
runInterceptHook();
// skip if multisig wallet not complete
if (!processModel.isMultisigSetupComplete()) return;
if (!processModel.isMultisigSetupComplete()) return; // TODO: woodser: this does not ack original request?
// skip if deposit tx already created
if (processModel.getDepositTxXmr() != null) return;
@ -74,7 +79,7 @@ public class SendSignContractRequestAfterMultisig extends TradeTask {
MoneroTxWallet depositTx = TradeUtils.createDepositTx(trade.getXmrWalletService(), tradeFee, multisigAddress, depositAmount);
// freeze deposit outputs
// TODO (woodser): save frozen key images and unfreeze if trade fails before sent to multisig
// TODO (woodser): save frozen key images and unfreeze if trade fails before deposited to multisig
for (MoneroOutput input : depositTx.getInputs()) {
wallet.freezeOutput(input.getKeyImage().getHex());
}
@ -83,52 +88,68 @@ public class SendSignContractRequestAfterMultisig extends TradeTask {
processModel.setDepositTxXmr(depositTx);
trade.getSelf().setDepositTxHash(depositTx.getHash());
// create request for peer and arbitrator to sign contract
SignContractRequest request = new SignContractRequest(
trade.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(),
Version.getP2PMessageVersion(),
new Date().getTime(),
trade.getProcessModel().getAccountId(),
trade.getProcessModel().getPaymentAccountPayload(trade).getHash(),
trade.getXmrWalletService().getAddressEntry(offer.getId(), XmrAddressEntry.Context.TRADE_PAYOUT).get().getAddressString(),
depositTx.getHash());
// complete on successful ack messages
TradeListener ackListener = new TradeListener() {
@Override
public void onAckMessage(AckMessage ackMessage, NodeAddress sender) {
if (!ackMessage.getSourceMsgClassName().equals(SignContractRequest.class.getSimpleName())) return;
if (ackMessage.isSuccess()) {
if (sender.equals(trade.getTradingPeerNodeAddress())) ack1 = true;
if (sender.equals(trade.getArbitratorNodeAddress())) ack2 = true;
if (ack1 && ack2) {
trade.removeListener(this);
completeAux();
}
} else {
if (!failed) {
failed = true;
failed(ackMessage.getErrorMessage()); // TODO: (woodser): only fail once? build into task?
}
}
}
};
trade.addListener(ackListener);
// send request to trading peer
processModel.getP2PService().sendEncryptedDirectMessage(trade.getTradingPeerNodeAddress(), trade.getTradingPeerPubKeyRing(), request, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getTradingPeerNodeAddress(), trade.getId());
ack1 = true;
if (ack1 && ack2) complete();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), trade.getTradingPeerNodeAddress(), trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
// send request to arbitrator
processModel.getP2PService().sendEncryptedDirectMessage(trade.getArbitratorNodeAddress(), trade.getArbitratorPubKeyRing(), request, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), trade.getArbitratorNodeAddress(), trade.getId());
ack2 = true;
if (ack1 && ack2) complete();
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), trade.getArbitratorNodeAddress(), trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
// send sign contract requests to peer and arbitrator
sendSignContractRequest(trade.getTradingPeerNodeAddress(), trade.getTradingPeerPubKeyRing(), offer, depositTx);
sendSignContractRequest(trade.getArbitratorNodeAddress(), trade.getArbitratorPubKeyRing(), offer, depositTx);
} catch (Throwable t) {
failed(t);
}
}
private void sendSignContractRequest(NodeAddress nodeAddress, PubKeyRing pubKeyRing, Offer offer, MoneroTxWallet depositTx) {
// create request to sign contract
SignContractRequest request = new SignContractRequest(
trade.getOffer().getId(),
processModel.getMyNodeAddress(),
processModel.getPubKeyRing(),
UUID.randomUUID().toString(), // TODO: ensure not reusing request id across protocol
Version.getP2PMessageVersion(),
new Date().getTime(),
trade.getProcessModel().getAccountId(),
trade.getProcessModel().getPaymentAccountPayload(trade).getHash(),
trade.getXmrWalletService().getAddressEntry(offer.getId(), XmrAddressEntry.Context.TRADE_PAYOUT).get().getAddressString(),
depositTx.getHash());
// send request
processModel.getP2PService().sendEncryptedDirectMessage(nodeAddress, pubKeyRing, request, new SendDirectMessageListener() {
@Override
public void onArrived() {
log.info("{} arrived: trading peer={}; offerId={}; uid={}", request.getClass().getSimpleName(), nodeAddress, trade.getId());
}
@Override
public void onFault(String errorMessage) {
log.error("Sending {} failed: uid={}; peer={}; error={}", request.getClass().getSimpleName(), nodeAddress, trade.getId(), errorMessage);
appendToErrorMessage("Sending message failed: message=" + request + "\nerrorMessage=" + errorMessage);
failed();
}
});
}
private void completeAux() {
processModel.getXmrWalletService().getWallet().save();
complete();
}
}

View File

@ -27,6 +27,8 @@ import bisq.proto.grpc.ConfirmPaymentStartedReply;
import bisq.proto.grpc.ConfirmPaymentStartedRequest;
import bisq.proto.grpc.GetTradeReply;
import bisq.proto.grpc.GetTradeRequest;
import bisq.proto.grpc.GetTradesReply;
import bisq.proto.grpc.GetTradesRequest;
import bisq.proto.grpc.KeepFundsReply;
import bisq.proto.grpc.KeepFundsRequest;
import bisq.proto.grpc.TakeOfferReply;
@ -40,8 +42,9 @@ import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import static bisq.core.api.model.TradeInfo.toTradeInfo;
@ -87,6 +90,25 @@ class GrpcTradesService extends TradesImplBase {
}
}
@Override
public void getTrades(GetTradesRequest req,
StreamObserver<GetTradesReply> responseObserver) {
try {
List<TradeInfo> trades = coreApi.getTrades()
.stream().map(TradeInfo::toTradeInfo)
.collect(Collectors.toList());
var reply = GetTradesReply.newBuilder()
.addAllTrades(trades.stream()
.map(TradeInfo::toProtoMessage)
.collect(Collectors.toList()))
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void takeOffer(TakeOfferRequest req,
StreamObserver<TakeOfferReply> responseObserver) {
@ -173,8 +195,9 @@ class GrpcTradesService extends TradesImplBase {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetTradeMethod().getFullMethodName(), new GrpcCallRateMeter(3, SECONDS));
put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetTradeMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getGetTradesMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
put(getConfirmPaymentStartedMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getKeepFundsMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));

View File

@ -300,6 +300,8 @@ message StopReply {
service Trades {
rpc GetTrade (GetTradeRequest) returns (GetTradeReply) {
}
rpc GetTrades (GetTradesRequest) returns (GetTradesReply) {
}
rpc TakeOffer (TakeOfferRequest) returns (TakeOfferReply) {
}
rpc ConfirmPaymentStarted (ConfirmPaymentStartedRequest) returns (ConfirmPaymentStartedReply) {
@ -344,6 +346,13 @@ message GetTradeReply {
TradeInfo trade = 1;
}
message GetTradesRequest {
}
message GetTradesReply {
repeated TradeInfo trades = 1;
}
message KeepFundsRequest {
string trade_id = 1;
}