Refactor check if offer is available process

This commit is contained in:
Manfred Karrer 2015-03-09 12:45:56 +01:00
parent b70868f793
commit 090f2e7428
28 changed files with 530 additions and 260 deletions

View File

@ -64,17 +64,17 @@ class OffersDataModel implements Activatable, DataModel {
@Override
public void activate() {
list.clear();
list.addAll(tradeManager.getOffers().values().stream().map(OfferListItem::new).collect(Collectors.toList()));
list.addAll(tradeManager.getOpenOffers().values().stream().map(OfferListItem::new).collect(Collectors.toList()));
// we sort by date, earliest first
list.sort((o1, o2) -> o2.getOffer().getCreationDate().compareTo(o1.getOffer().getCreationDate()));
tradeManager.getOffers().addListener(offerMapChangeListener);
tradeManager.getOpenOffers().addListener(offerMapChangeListener);
}
@Override
public void deactivate() {
tradeManager.getOffers().removeListener(offerMapChangeListener);
tradeManager.getOpenOffers().removeListener(offerMapChangeListener);
}
void removeOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {

View File

@ -22,6 +22,7 @@ import io.bitsquare.locale.Country;
import io.bitsquare.locale.CurrencyUtil;
import io.bitsquare.offer.Offer;
import io.bitsquare.offer.OfferBookService;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.user.User;
import io.bitsquare.util.Utilities;
@ -68,12 +69,12 @@ public class OfferBook {
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
OfferBook(OfferBookService offerBookService, User user) {
OfferBook(OfferBookService offerBookService, User user, TradeManager tradeManager) {
this.offerBookService = offerBookService;
this.user = user;
bankAccountChangeListener = (observableValue, oldValue, newValue) -> setBankAccount(newValue);
invalidationListener = (ov, oldValue, newValue) -> requestOffers();
invalidationListener = (ov, oldValue, newValue) -> requestGetOffers();
remoteOfferBookListener = new OfferBookService.Listener() {
@Override
@ -90,6 +91,12 @@ public class OfferBook {
@Override
public void onOfferRemoved(Offer offer) {
// Update state in case that that offer is used in the take offer screen, so it gets updated correctly
offer.setState(Offer.State.OFFER_REMOVED);
// clean up possible references in tradeManager
tradeManager.handleRemovedOffer(offer);
offerBookListItems.removeIf(item -> item.getOffer().getId().equals(offer.getId()));
}
};
@ -159,7 +166,7 @@ public class OfferBook {
}
}
private void requestOffers() {
private void requestGetOffers() {
offerBookService.getOffers(fiatCode);
}

View File

@ -216,9 +216,12 @@ public class OfferBookView extends ActivatableViewAndModel<GridPane, OfferBookVi
}
private void takeOffer(Offer offer) {
if (model.isRegistered()) {
if (offer.getDirection() == Direction.BUY) {
// reset available state
if (offer.getState() != Offer.State.OFFER_REMOVED)
offer.setState(Offer.State.UNKNOWN);
offerActionHandler.takeOffer(model.getAmountAsCoin(), model.getPriceAsCoin(), offer);
}
else {

View File

@ -21,16 +21,11 @@ import io.bitsquare.btc.AddressEntry;
import io.bitsquare.btc.FeePolicy;
import io.bitsquare.btc.WalletService;
import io.bitsquare.btc.listeners.BalanceListener;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.settings.Preferences;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestIsOfferAvailableMessage;
import org.bitcoinj.core.Coin;
import org.bitcoinj.utils.ExchangeRate;
@ -61,15 +56,8 @@ import org.slf4j.LoggerFactory;
class TakeOfferDataModel implements Activatable, DataModel {
private static final Logger log = LoggerFactory.getLogger(TakeOfferDataModel.class);
enum OfferAvailableState {
UNKNOWN,
OFFER_NOT_AVAILABLE,
OFFER_AVAILABLE
}
private final TradeManager tradeManager;
private final WalletService walletService;
private TradeMessageService tradeMessageService;
private final Preferences preferences;
private final Persistence persistence;
@ -91,19 +79,15 @@ class TakeOfferDataModel implements Activatable, DataModel {
final ObjectProperty<Coin> offerFeeAsCoin = new SimpleObjectProperty<>();
final ObjectProperty<Coin> networkFeeAsCoin = new SimpleObjectProperty<>();
final ObjectProperty<OfferAvailableState> offerIsAvailable = new SimpleObjectProperty<>(OfferAvailableState
.UNKNOWN);
//
private boolean isActivated;
final ObjectProperty<Offer.State> offerIsAvailable = new SimpleObjectProperty<>(Offer.State.UNKNOWN);
@Inject
public TakeOfferDataModel(TradeManager tradeManager, WalletService walletService, TradeMessageService tradeMessageService,
public TakeOfferDataModel(TradeManager tradeManager,
WalletService walletService,
Preferences preferences,
Persistence persistence) {
this.tradeManager = tradeManager;
this.walletService = walletService;
this.tradeMessageService = tradeMessageService;
this.preferences = preferences;
this.persistence = persistence;
@ -113,13 +97,11 @@ class TakeOfferDataModel implements Activatable, DataModel {
@Override
public void activate() {
isActivated = true;
btcCode.bind(preferences.btcDenominationProperty());
}
@Override
public void deactivate() {
isActivated = false;
btcCode.unbind();
}
@ -148,49 +130,12 @@ class TakeOfferDataModel implements Activatable, DataModel {
});
updateBalance(walletService.getBalanceForAddress(addressEntry.getAddress()));
getPeerAddress(offer);
}
// TODO: Should be moved to a domain and handled with add/remove listeners instead of isActivated
// or maybe with rx?
private void getPeerAddress(Offer offer) {
tradeMessageService.getPeerAddress(offer.getMessagePublicKey(), new GetPeerAddressListener() {
@Override
public void onResult(Peer peer) {
if (isActivated)
isOfferAvailable(peer, offer.getId());
}
@Override
public void onFailed() {
if (isActivated)
log.error("The offerers address have not been found. That should never happen.");
}
offer.getStateProperty().addListener((observable, oldValue, newValue) -> {
offerIsAvailable.set(newValue);
});
tradeManager.requestIsOfferAvailable(offer);
}
private void isOfferAvailable(Peer peer, String offerId) {
tradeMessageService.sendMessage(peer, new RequestIsOfferAvailableMessage(offerId),
new OutgoingMessageListener() {
@Override
public void onResult() {
if (isActivated) {
log.trace("RequestIsOfferAvailableMessage successfully arrived at peer");
offerIsAvailable.set(OfferAvailableState.OFFER_AVAILABLE);
}
}
@Override
public void onFailed() {
if (isActivated) {
log.error("RequestIsOfferAvailableMessage did not arrive at peer");
offerIsAvailable.set(OfferAvailableState.OFFER_NOT_AVAILABLE);
}
}
});
}
void takeOffer() {
final Trade trade = tradeManager.takeOffer(amountAsCoin.get(), offer);
trade.stateProperty().addListener((ov, oldValue, newValue) -> {

View File

@ -140,22 +140,30 @@ public class TakeOfferView extends ActivatableViewAndModel<AnchorPane, TakeOffer
acceptedLanguagesTextField.setText(model.getAcceptedLanguages());
acceptedArbitratorsTextField.setText(model.getAcceptedArbitrators());
model.offerIsAvailable.addListener((ov, oldValue, newValue) -> {
isOfferAvailableLabel.setVisible(false);
isOfferAvailableLabel.setManaged(false);
isOfferAvailableProgressIndicator.setProgress(0);
isOfferAvailableProgressIndicator.setVisible(false);
isOfferAvailableProgressIndicator.setManaged(false);
model.offerIsAvailable.addListener((ov, oldValue, newValue) -> handleOfferIsAvailableState(newValue));
handleOfferIsAvailableState(model.offerIsAvailable.get());
}
if ((newValue == TakeOfferDataModel.OfferAvailableState.OFFER_AVAILABLE)) {
showPaymentInfoScreenButton.setVisible(true);
}
else if ((newValue == TakeOfferDataModel.OfferAvailableState.OFFER_NOT_AVAILABLE)) {
Popups.openWarningPopup("You cannot take that offer",
"The offerer is either offline or the offer was already taken by another trader.");
close();
}
});
private void handleOfferIsAvailableState(Offer.State state) {
isOfferAvailableLabel.setVisible(false);
isOfferAvailableLabel.setManaged(false);
isOfferAvailableProgressIndicator.setProgress(0);
isOfferAvailableProgressIndicator.setVisible(false);
isOfferAvailableProgressIndicator.setManaged(false);
if ((state == Offer.State.OFFER_AVAILABLE)) {
showPaymentInfoScreenButton.setVisible(true);
}
else if ((state == Offer.State.OFFER_NOT_AVAILABLE)) {
Popups.openWarningPopup("You cannot take that offer",
"The offerer is either offline or the offer was already taken by another trader.");
close();
}
else if ((state == Offer.State.OFFER_REMOVED)) {
Popups.openWarningPopup("You cannot take that offer",
"The offerer has been removed in the meantime.");
close();
}
}
public void configCloseHandlers(BooleanProperty tabIsClosable) {

View File

@ -82,8 +82,7 @@ class TakeOfferViewModel extends ActivatableWithDataModel<TakeOfferDataModel> im
final BooleanProperty showWarningInvalidBtcDecimalPlaces = new SimpleBooleanProperty();
final BooleanProperty showTransactionPublishedScreen = new SimpleBooleanProperty();
final BooleanProperty tabIsClosable = new SimpleBooleanProperty(true);
final ObjectProperty<TakeOfferDataModel.OfferAvailableState> offerIsAvailable =
new SimpleObjectProperty<>(TakeOfferDataModel.OfferAvailableState.UNKNOWN);
final ObjectProperty<Offer.State> offerIsAvailable = new SimpleObjectProperty<>(Offer.State.UNKNOWN);
final ObjectProperty<InputValidator.ValidationResult> amountValidationResult = new SimpleObjectProperty<>();

View File

@ -34,6 +34,9 @@ import java.util.Date;
import java.util.List;
import java.util.Locale;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import static com.google.common.base.Preconditions.*;
import static io.bitsquare.btc.Restrictions.MIN_TRADE_AMOUNT;
@ -42,6 +45,14 @@ import static io.bitsquare.btc.Restrictions.MIN_TRADE_AMOUNT;
public class Offer implements Serializable {
private static final long serialVersionUID = -971164804305475826L;
public enum State {
UNKNOWN,
OFFER_AVAILABLE,
OFFER_NOT_AVAILABLE,
OFFER_REMOVED
}
// key attributes for lookup
private final Direction direction;
private final Currency currency;
@ -64,8 +75,10 @@ public class Offer implements Serializable {
private final List<Locale> acceptedLanguageLocales;
private final String bankAccountUID;
private final List<Arbitrator> arbitrators;
private String offerFeePaymentTxID;
private State state = State.UNKNOWN;
private transient ObjectProperty<State> stateProperty; // don't access directly, use getStateProperty()
///////////////////////////////////////////////////////////////////////////////////////////
@ -103,6 +116,7 @@ public class Offer implements Serializable {
this.acceptedLanguageLocales = acceptedLanguageLocales;
creationDate = new Date();
getStateProperty().set(state);
}
@ -110,11 +124,11 @@ public class Offer implements Serializable {
// Setters
///////////////////////////////////////////////////////////////////////////////////////////
public PublicKey getMessagePublicKey() {
return messagePublicKey;
public void setState(State state) {
this.state = state;
getStateProperty().set(state);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
@ -198,10 +212,24 @@ public class Offer implements Serializable {
return bankAccountUID;
}
public PublicKey getMessagePublicKey() {
return messagePublicKey;
}
public Date getCreationDate() {
return creationDate;
}
public State getState() {
return state;
}
public ObjectProperty<State> getStateProperty() {
if (stateProperty == null)
stateProperty = new SimpleObjectProperty<>(state);
return stateProperty;
}
public void validate() throws Exception {
checkNotNull(getAcceptedCountries(), "AcceptedCountries is null");
checkNotNull(getAcceptedLanguageLocales(), "AcceptedLanguageLocales is null");

View File

@ -29,16 +29,18 @@ import io.bitsquare.offer.Offer;
import io.bitsquare.offer.OfferBookService;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.protocol.placeoffer.PlaceOfferProtocol;
import io.bitsquare.trade.protocol.trade.OfferMessage;
import io.bitsquare.trade.protocol.trade.TradeMessage;
import io.bitsquare.trade.protocol.trade.offerer.BuyerAcceptsOfferProtocol;
import io.bitsquare.trade.protocol.trade.offerer.BuyerAcceptsOfferProtocolListener;
import io.bitsquare.trade.protocol.trade.offerer.messages.BankTransferInitedMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.DepositTxPublishedMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.IsOfferAvailableResponseMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.RequestTakerDepositPaymentMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.RespondToIsOfferAvailableMessage;
import io.bitsquare.trade.protocol.trade.offerer.messages.RespondToTakeOfferRequestMessage;
import io.bitsquare.trade.protocol.trade.offerer.tasks.IsOfferAvailableResponse;
import io.bitsquare.trade.protocol.trade.taker.RequestIsOfferAvailableProtocol;
import io.bitsquare.trade.protocol.trade.taker.SellerTakesOfferProtocol;
import io.bitsquare.trade.protocol.trade.taker.SellerTakesOfferProtocolListener;
import io.bitsquare.trade.protocol.trade.taker.messages.PayoutTxPublishedMessage;
@ -69,6 +71,8 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* The domain for the trading
* TODO: Too messy, need to be improved a lot....
@ -88,8 +92,9 @@ public class TradeManager {
//TODO store TakerAsSellerProtocol in trade
private final Map<String, SellerTakesOfferProtocol> takerAsSellerProtocolMap = new HashMap<>();
private final Map<String, BuyerAcceptsOfferProtocol> offererAsBuyerProtocolMap = new HashMap<>();
private final Map<String, RequestIsOfferAvailableProtocol> requestIsOfferAvailableProtocolMap = new HashMap<>();
private final ObservableMap<String, Offer> offers = FXCollections.observableHashMap();
private final ObservableMap<String, Offer> openOffers = FXCollections.observableHashMap();
private final ObservableMap<String, Trade> pendingTrades = FXCollections.observableHashMap();
private final ObservableMap<String, Trade> closedTrades = FXCollections.observableHashMap();
@ -118,7 +123,7 @@ public class TradeManager {
Object offersObject = persistence.read(this, "offers");
if (offersObject instanceof Map) {
offers.putAll((Map<String, Offer>) offersObject);
openOffers.putAll((Map<String, Offer>) offersObject);
}
Object pendingTradesObject = persistence.read(this, "pendingTrades");
@ -131,7 +136,7 @@ public class TradeManager {
closedTrades.putAll((Map<String, Trade>) closedTradesObject);
}
tradeMessageService.addIncomingMessageListener(this::onIncomingTradeMessage);
tradeMessageService.addHandleNewMessageListener(this::handleNewMessage);
}
@ -140,7 +145,7 @@ public class TradeManager {
///////////////////////////////////////////////////////////////////////////////////////////
public void cleanup() {
tradeMessageService.removeIncomingMessageListener(this::onIncomingTradeMessage);
tradeMessageService.removeHandleNewMessageListener(this::handleNewMessage);
}
@ -187,15 +192,15 @@ public class TradeManager {
}
private void saveOffer(Offer offer) {
offers.put(offer.getId(), offer);
openOffers.put(offer.getId(), offer);
persistOffers();
}
public void requestRemoveOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
offerBookService.removeOffer(offer,
() -> {
if (offers.containsKey(offer.getId())) {
offers.remove(offer.getId());
if (openOffers.containsKey(offer.getId())) {
openOffers.remove(offer.getId());
persistOffers();
resultHandler.handleResult();
}
@ -241,8 +246,8 @@ public class TradeManager {
private void createOffererAsBuyerProtocol(String offerId, Peer sender) {
log.trace("createOffererAsBuyerProtocol offerId = " + offerId);
if (offers.containsKey(offerId)) {
Offer offer = offers.get(offerId);
if (openOffers.containsKey(offerId)) {
Offer offer = openOffers.get(offerId);
Trade trade = createTrade(offer);
currentPendingTrade = trade;
@ -368,6 +373,14 @@ public class TradeManager {
@Override
public void onFault(Throwable throwable, SellerTakesOfferProtocol.State state) {
log.error("onFault: " + throwable.getMessage() + " / " + state);
switch (state) {
case GetPeerAddress:
// TODO add unreachable node to a local ignore list in case of repeated failures
break;
case RequestTakeOffer:
// TODO add unreachable node to a local ignore list in case of repeated failures
break;
}
}
// probably not needed
@ -379,7 +392,12 @@ public class TradeManager {
};
SellerTakesOfferProtocol sellerTakesOfferProtocol = new SellerTakesOfferProtocol(
trade, listener, tradeMessageService, walletService, blockChainService, signatureService,
trade,
listener,
tradeMessageService,
walletService,
blockChainService,
signatureService,
user);
takerAsSellerProtocolMap.put(trade.getId(), sellerTakesOfferProtocol);
sellerTakesOfferProtocol.start();
@ -407,62 +425,80 @@ public class TradeManager {
takerAsSellerProtocolMap.get(tradeId).onUIEventFiatReceived();
}
public void requestIsOfferAvailable(Offer offer) {
if (!requestIsOfferAvailableProtocolMap.containsKey(offer.getId())) {
RequestIsOfferAvailableProtocol protocol = new RequestIsOfferAvailableProtocol(offer, tradeMessageService);
requestIsOfferAvailableProtocolMap.put(offer.getId(), protocol);
protocol.start();
}
else {
log.warn("requestIsOfferAvailable already called for offer with ID:" + offer.getId());
}
}
public void handleRemovedOffer(Offer offer) {
requestIsOfferAvailableProtocolMap.remove(offer.getId());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Process incoming tradeMessages
// Process new tradeMessages
///////////////////////////////////////////////////////////////////////////////////////////
// Routes the incoming messages to the responsible protocol
private void onIncomingTradeMessage(Message message, Peer sender) {
if (!(message instanceof TradeMessage))
throw new IllegalArgumentException("message must be of type TradeMessage");
TradeMessage tradeMessage = (TradeMessage) message;
private void handleNewMessage(Message message, Peer sender) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName());
log.trace("handleNewMessage: sender = " + sender);
log.trace("onIncomingTradeMessage instance " + tradeMessage.getClass().getSimpleName());
log.trace("onIncomingTradeMessage sender " + sender);
String tradeId = tradeMessage.getTradeId();
if (tradeId != null) {
if (tradeMessage instanceof RequestIsOfferAvailableMessage) {
// TODO Does not fit in any of the 2 protocols, but should not be here as well...
// Lets keep it until we refactor the trade process
boolean isOfferOpen = getTrade(tradeId) == null;
RespondToIsOfferAvailableMessage replyMessage =
new RespondToIsOfferAvailableMessage(tradeId, isOfferOpen);
tradeMessageService.sendMessage(sender, replyMessage, new OutgoingMessageListener() {
@Override
public void onResult() {
log.trace("RespondToTakeOfferRequestMessage successfully arrived at peer");
}
@Override
public void onFailed() {
log.error("AcceptTakeOfferRequestMessage did not arrive at peer");
}
});
if (message instanceof OfferMessage) {
OfferMessage offerMessage = (OfferMessage) message;
// Before starting any take offer activity we check if the offer is still available.
if (offerMessage instanceof RequestIsOfferAvailableMessage) {
// That message arrives at the offerer and he returns if the offer is still available (if there is no trade already created with that offerId).
String offerId = offerMessage.getOfferId();
checkNotNull(offerId);
boolean isOfferOpen = getTrade(offerId) == null;
// no handling of results or faults needed
IsOfferAvailableResponse.run(sender, tradeMessageService, offerId, isOfferOpen);
}
else if (tradeMessage instanceof RequestTakeOfferMessage) {
else if (offerMessage instanceof IsOfferAvailableResponseMessage) {
// That message arrives at the taker in response to a previous requestIsOfferAvailable call.
// It might be that the offer got removed form the offer book, so lets check if its still there.
if (requestIsOfferAvailableProtocolMap.containsKey(offerMessage.getOfferId())) {
RequestIsOfferAvailableProtocol protocol = requestIsOfferAvailableProtocolMap.get(offerMessage.getOfferId());
protocol.handleIsOfferAvailableResponseMessage((IsOfferAvailableResponseMessage) offerMessage);
requestIsOfferAvailableProtocolMap.remove(offerMessage.getOfferId());
}
else {
log.info("Offer might have been removed in the meantime. No protocol found for offer with ID:" + offerMessage.getOfferId());
}
}
else {
log.error("Incoming offerMessage not supported. " + offerMessage);
}
}
else if (message instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) message;
String tradeId = tradeMessage.getTradeId();
checkNotNull(tradeId);
if (tradeMessage instanceof RequestTakeOfferMessage) {
createOffererAsBuyerProtocol(tradeId, sender);
}
else if (tradeMessage instanceof RespondToTakeOfferRequestMessage) {
takerAsSellerProtocolMap.get(tradeId).onRespondToTakeOfferRequestMessage(
(RespondToTakeOfferRequestMessage) tradeMessage);
takerAsSellerProtocolMap.get(tradeId).onRespondToTakeOfferRequestMessage((RespondToTakeOfferRequestMessage) tradeMessage);
}
else if (tradeMessage instanceof TakeOfferFeePayedMessage) {
offererAsBuyerProtocolMap.get(tradeId).onTakeOfferFeePayedMessage((TakeOfferFeePayedMessage)
tradeMessage);
offererAsBuyerProtocolMap.get(tradeId).onTakeOfferFeePayedMessage((TakeOfferFeePayedMessage) tradeMessage);
}
else if (tradeMessage instanceof RequestTakerDepositPaymentMessage) {
takerAsSellerProtocolMap.get(tradeId).onRequestTakerDepositPaymentMessage(
(RequestTakerDepositPaymentMessage) tradeMessage);
takerAsSellerProtocolMap.get(tradeId).onRequestTakerDepositPaymentMessage((RequestTakerDepositPaymentMessage) tradeMessage);
}
else if (tradeMessage instanceof RequestOffererPublishDepositTxMessage) {
offererAsBuyerProtocolMap.get(tradeId).onRequestOffererPublishDepositTxMessage(
(RequestOffererPublishDepositTxMessage) tradeMessage);
offererAsBuyerProtocolMap.get(tradeId).onRequestOffererPublishDepositTxMessage((RequestOffererPublishDepositTxMessage) tradeMessage);
}
else if (tradeMessage instanceof DepositTxPublishedMessage) {
persistPendingTrades();
takerAsSellerProtocolMap.get(tradeId).onDepositTxPublishedMessage((DepositTxPublishedMessage)
tradeMessage);
takerAsSellerProtocolMap.get(tradeId).onDepositTxPublishedMessage((DepositTxPublishedMessage) tradeMessage);
}
else if (tradeMessage instanceof BankTransferInitedMessage) {
// Here happened a null pointer. I assume the only possible reason was that we got a null for the
@ -471,29 +507,21 @@ public class TradeManager {
// For getting better info we add a check. tradeId is checked above.
if (takerAsSellerProtocolMap.get(tradeId) == null)
log.error("takerAsSellerProtocolMap.get(tradeId) = null. That must not happen.");
takerAsSellerProtocolMap.get(tradeId).onBankTransferInitedMessage((BankTransferInitedMessage)
tradeMessage);
takerAsSellerProtocolMap.get(tradeId).onBankTransferInitedMessage((BankTransferInitedMessage) tradeMessage);
}
else if (tradeMessage instanceof PayoutTxPublishedMessage) {
offererAsBuyerProtocolMap.get(tradeId).onPayoutTxPublishedMessage((PayoutTxPublishedMessage)
tradeMessage);
offererAsBuyerProtocolMap.get(tradeId).onPayoutTxPublishedMessage((PayoutTxPublishedMessage) tradeMessage);
}
else {
log.error("Incoming tradeMessage not supported. " + tradeMessage);
}
}
else {
log.error("tradeId from onIncomingTradeMessage is null. That must not happen.");
log.error("Incoming message not supported. " + message);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Utils
///////////////////////////////////////////////////////////////////////////////////////////
public boolean isOfferAlreadyInTrades(Offer offer) {
return pendingTrades.containsKey(offer.getId());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Setters
///////////////////////////////////////////////////////////////////////////////////////////
@ -507,8 +535,8 @@ public class TradeManager {
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public ObservableMap<String, Offer> getOffers() {
return offers;
public ObservableMap<String, Offer> getOpenOffers() {
return openOffers;
}
public ObservableMap<String, Trade> getPendingTrades() {
@ -536,7 +564,7 @@ public class TradeManager {
///////////////////////////////////////////////////////////////////////////////////////////
private void persistOffers() {
persistence.write(this, "offers", (Map<String, Offer>) new HashMap<>(offers));
persistence.write(this, "offers", (Map<String, Offer>) new HashMap<>(openOffers));
}
private void persistPendingTrades() {

View File

@ -21,8 +21,8 @@ import io.bitsquare.network.Message;
import io.bitsquare.network.MessageBroker;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.IncomingMessageListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.HandleNewMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import java.security.PublicKey;
@ -32,11 +32,11 @@ public interface TradeMessageService extends MessageBroker {
void setExecutor(Executor executor);
void sendMessage(Peer peer, Message message, OutgoingMessageListener listener);
void sendMessage(Peer peer, Message message, SendMessageListener listener);
void addIncomingMessageListener(IncomingMessageListener listener);
void addHandleNewMessageListener(HandleNewMessageListener listener);
void removeIncomingMessageListener(IncomingMessageListener listener);
void removeHandleNewMessageListener(HandleNewMessageListener listener);
void getPeerAddress(PublicKey messagePublicKey, GetPeerAddressListener getPeerAddressListener);
}

View File

@ -17,8 +17,8 @@
package io.bitsquare.trade.listeners;
public interface OutgoingMessageListener {
void onFailed();
public interface SendMessageListener {
void handleFault();
void onResult();
void handleResult();
}

View File

@ -15,11 +15,10 @@
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.listeners;
package io.bitsquare.trade.protocol.trade;
import io.bitsquare.network.Message;
import io.bitsquare.network.Peer;
public interface IncomingMessageListener {
void onMessage(Message message, Peer sender);
public interface OfferMessage extends Message {
public String getOfferId();
}

View File

@ -60,9 +60,9 @@ import static io.bitsquare.util.Validator.*;
/**
* Responsible for the correct execution of the sequence of tasks, message passing to the peer and message processing
* from the peer.
* <p>
* <p/>
* This class handles the role of the offerer as the Bitcoin buyer.
* <p>
* <p/>
* It uses sub tasks to not pollute the main class too much with all the async result/fault handling.
* Any data from incoming messages need to be validated before further processing.
*/
@ -159,7 +159,7 @@ public class BuyerAcceptsOfferProtocol {
checkNotNull(tradeId);
checkNotNull(offer);
//TODO use first for now
//TODO use default arbitrator for now
arbitratorPubKey = offer.getArbitrators().get(0).getPubKeyAsHex();
bankAccount = user.getBankAccount(trade.getOffer().getBankAccountId());
@ -173,10 +173,21 @@ public class BuyerAcceptsOfferProtocol {
}
public void start() {
log.debug("start called " + step++);
handleTakeOfferRequest();
}
// 1. HandleTakeOfferRequest
// Async
// In case of an error: Repeat once, then give up. No rollback activity needed
private void handleTakeOfferRequest() {
log.debug("handleTakeOfferRequest called " + step++);
state = State.HandleTakeOfferRequest;
HandleTakeOfferRequest.run(this::onResultHandleTakeOfferRequest, this::onFault, peer, tradeMessageService,
trade.getState(), tradeId);
HandleTakeOfferRequest.run(this::onResultHandleTakeOfferRequest, this::onHandleTakeOfferRequestFault, peer, tradeMessageService, trade.getState(),
tradeId);
}
private void onHandleTakeOfferRequestFault(Throwable throwable) {
HandleTakeOfferRequest.run(this::onResultHandleTakeOfferRequest, this::onFault, peer, tradeMessageService, trade.getState(), tradeId);
}
public void onResultHandleTakeOfferRequest(boolean takeOfferRequestAccepted) {
@ -187,6 +198,7 @@ public class BuyerAcceptsOfferProtocol {
listener.onWaitingForPeerResponse(state);
}
else {
// Don't use OFFERER_REJECTED as that trade might has been accepted to another taker.
log.info("Finish here as we have already the offer accepted.");
}
}

View File

@ -17,23 +17,23 @@
package io.bitsquare.trade.protocol.trade.offerer.messages;
import io.bitsquare.trade.protocol.trade.TradeMessage;
import io.bitsquare.trade.protocol.trade.OfferMessage;
import java.io.Serializable;
public class RespondToIsOfferAvailableMessage implements Serializable, TradeMessage {
public class IsOfferAvailableResponseMessage implements Serializable, OfferMessage {
private static final long serialVersionUID = 6177387534187739018L;
private final String tradeId;
private final String offerId;
private final boolean isOfferOpen;
public RespondToIsOfferAvailableMessage(String tradeId, boolean isOfferOpen) {
this.tradeId = tradeId;
public IsOfferAvailableResponseMessage(String offerId, boolean isOfferOpen) {
this.offerId = offerId;
this.isOfferOpen = isOfferOpen;
}
@Override
public String getTradeId() {
return tradeId;
public String getOfferId() {
return offerId;
}
public boolean isOfferOpen() {

View File

@ -17,10 +17,10 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.trade.protocol.trade.offerer.messages.RespondToTakeOfferRequestMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -32,22 +32,21 @@ public class HandleTakeOfferRequest {
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
TradeMessageService tradeMessageService, Trade.State tradeState, String tradeId) {
log.trace("Run task");
log.trace("Run HandleTakeOfferRequest task");
boolean isTradeIsOpen = tradeState == Trade.State.OPEN;
if (!isTradeIsOpen) {
log.warn("Received take offer request but the offer not marked as open anymore.");
}
RespondToTakeOfferRequestMessage tradeMessage =
new RespondToTakeOfferRequestMessage(tradeId, isTradeIsOpen);
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
RespondToTakeOfferRequestMessage tradeMessage = new RespondToTakeOfferRequestMessage(tradeId, isTradeIsOpen);
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("RespondToTakeOfferRequestMessage successfully arrived at peer");
resultHandler.onResult(isTradeIsOpen);
resultHandler.handleResult(isTradeIsOpen);
}
@Override
public void onFailed() {
public void handleFault() {
log.error("AcceptTakeOfferRequestMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("AcceptTakeOfferRequestMessage did not arrive at peer"));
}
@ -55,6 +54,6 @@ public class HandleTakeOfferRequest {
}
public interface ResultHandler {
void onResult(boolean takeOfferRequestAccepted);
void handleResult(boolean takeOfferRequestAccepted);
}
}

View File

@ -0,0 +1,51 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.trade.protocol.trade.offerer.messages.IsOfferAvailableResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IsOfferAvailableResponse {
private static final Logger log = LoggerFactory.getLogger(IsOfferAvailableResponse.class);
public static void run(Peer peer,
TradeMessageService tradeMessageService,
String offerId,
boolean isOfferOpen) {
log.trace("Run RespondToIsOfferAvailable task");
IsOfferAvailableResponseMessage message = new IsOfferAvailableResponseMessage(offerId, isOfferOpen);
tradeMessageService.sendMessage(peer, message, new SendMessageListener() {
@Override
public void handleResult() {
log.trace("RespondToIsOfferAvailableMessage successfully arrived at peer");
// Nothing to do. Taker knows now offer available state.
}
@Override
public void handleFault() {
log.error("RespondToIsOfferAvailableMessage did not arrive at peer");
// Ignore that. Taker might have gone offline
}
});
}
}

View File

@ -19,7 +19,7 @@ package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.bank.BankAccount;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.RequestTakerDepositPaymentMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -44,15 +44,15 @@ public class RequestTakerDepositPayment {
log.trace("Run task");
RequestTakerDepositPaymentMessage tradeMessage = new RequestTakerDepositPaymentMessage(
tradeId, bankAccount, accountId, offererPubKey, preparedOffererDepositTxAsHex, offererTxOutIndex);
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("RequestTakerDepositPaymentMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("RequestTakerDepositPaymentMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("RequestTakerDepositPaymentMessage did not arrive at " +
"peer"));

View File

@ -18,7 +18,7 @@
package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.DepositTxPublishedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -39,15 +39,15 @@ public class SendDepositTxIdToTaker {
DepositTxPublishedMessage tradeMessage =
new DepositTxPublishedMessage(tradeId, Utils.HEX.encode(depositTransaction.bitcoinSerialize()));
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("DepositTxPublishedMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("DepositTxPublishedMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("DepositTxPublishedMessage did not arrive at peer"));
}

View File

@ -19,7 +19,7 @@ package io.bitsquare.trade.protocol.trade.offerer.tasks;
import io.bitsquare.btc.WalletService;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.offerer.messages.BankTransferInitedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -68,15 +68,15 @@ public class SendSignedPayoutTx {
takerPaybackAmount,
offererPayoutAddress);
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("BankTransferInitedMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("BankTransferInitedMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("BankTransferInitedMessage did not arrive at peer"));

View File

@ -0,0 +1,116 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.protocol.trade.taker;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.protocol.trade.offerer.messages.IsOfferAvailableResponseMessage;
import io.bitsquare.trade.protocol.trade.taker.tasks.GetPeerAddress;
import io.bitsquare.trade.protocol.trade.taker.tasks.RequestIsOfferAvailable;
import java.security.PublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Responsible for the correct execution of the sequence of tasks, message passing to the peer and message processing
* from the peer.
* That class handles the role of the taker as the Bitcoin seller.
* It uses sub tasks to not pollute the main class too much with all the async result/fault handling.
* Any data from incoming messages as well data used to send to the peer need to be validated before further processing.
*/
public class RequestIsOfferAvailableProtocol {
private static final Logger log = LoggerFactory.getLogger(RequestIsOfferAvailableProtocol.class);
// provided data
private final Offer offer;
private final TradeMessageService tradeMessageService;
// derived
private final String offerId;
private final PublicKey offererMessagePublicKey;
// written/read by task
private Peer peer;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public RequestIsOfferAvailableProtocol(Offer offer,
TradeMessageService tradeMessageService) {
this.offer = offer;
this.tradeMessageService = tradeMessageService;
offerId = offer.getId();
offererMessagePublicKey = offer.getMessagePublicKey();
}
public void start() {
getPeerAddress();
}
// 1. GetPeerAddress
// Async
// In case of an error: Repeat once, then give up.
private void getPeerAddress() {
log.debug("getPeerAddress called");
GetPeerAddress.run(this::onResultGetPeerAddress, this::onGetPeerAddressFault, tradeMessageService, offererMessagePublicKey);
}
private void onGetPeerAddressFault(String errorMessage) {
GetPeerAddress.run(this::onResultGetPeerAddress, this::handleErrorMessage, tradeMessageService, offererMessagePublicKey);
}
// 2. RequestTakeOffer
// Async
// In case of an error: Repeat once, then give up.
public void onResultGetPeerAddress(Peer peer) {
log.debug("onResultGetPeerAddress called");
this.peer = peer;
RequestIsOfferAvailable.run(this::onRequestIsOfferAvailableFault, peer, tradeMessageService, offerId);
}
private void onRequestIsOfferAvailableFault(String errorMessage) {
RequestIsOfferAvailable.run(this::handleErrorMessage, peer, tradeMessageService, offerId);
}
// generic
private void handleErrorMessage(String errorMessage) {
offer.setState(Offer.State.OFFER_NOT_AVAILABLE);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message from peer
///////////////////////////////////////////////////////////////////////////////////////////
public void handleIsOfferAvailableResponseMessage(IsOfferAvailableResponseMessage offerMessage) {
if (offer.getState() != Offer.State.OFFER_REMOVED) {
if (offerMessage.isOfferOpen())
offer.setState(Offer.State.OFFER_AVAILABLE);
else
offer.setState(Offer.State.OFFER_NOT_AVAILABLE);
}
}
}

View File

@ -100,7 +100,7 @@ public class SellerTakesOfferProtocol {
private final Coin tradeAmount;
private final String pubKeyForThatTrade;
private final ECKey accountKey;
private final PublicKey peersMessagePublicKey;
private final PublicKey offererMessagePublicKey;
private final Coin securityDeposit;
private final String arbitratorPubKey;
@ -120,6 +120,7 @@ public class SellerTakesOfferProtocol {
private Coin offererPaybackAmount;
private Coin takerPaybackAmount;
private String offererPayoutAddress;
private int repeatCounter = 0;
// state
@ -152,7 +153,7 @@ public class SellerTakesOfferProtocol {
//TODO use 1. for now
arbitratorPubKey = trade.getOffer().getArbitrators().get(0).getPubKeyAsHex();
peersMessagePublicKey = offer.getMessagePublicKey();
offererMessagePublicKey = offer.getMessagePublicKey();
bankAccount = user.getCurrentBankAccount().get();
accountId = user.getAccountId();
@ -164,20 +165,37 @@ public class SellerTakesOfferProtocol {
state = State.Init;
}
// 1. GetPeerAddress
// Async
// In case of an error: No rollback activity needed
public void start() {
log.debug("start called " + step++);
state = State.GetPeerAddress;
GetPeerAddress.run(this::onResultGetPeerAddress, this::onFault, tradeMessageService, peersMessagePublicKey);
getPeerAddress();
}
// 1. GetPeerAddress
// Async
// In case of an error: Repeat once, then give up. No rollback activity needed
private void getPeerAddress() {
log.debug("getPeerAddress called " + step++);
state = State.GetPeerAddress;
GetPeerAddress.run(this::onResultGetPeerAddress, this::onGetPeerAddressFault, tradeMessageService, offererMessagePublicKey);
}
private void onGetPeerAddressFault(String errorMessage) {
log.debug("Run getPeerAddress again after onGetPeerAddressFault" + step);
GetPeerAddress.run(this::onResultGetPeerAddress, this::onErrorMessage, tradeMessageService, offererMessagePublicKey);
}
// 2. RequestTakeOffer
// Async
// In case of an error: Repeat once, then give up. No rollback activity needed
public void onResultGetPeerAddress(Peer peer) {
log.debug("onResultGetPeerAddress called " + step++);
this.peer = peer;
state = State.RequestTakeOffer;
RequestTakeOffer.run(this::onResultRequestTakeOffer, this::onRequestTakeOfferFault, peer, tradeMessageService, tradeId);
}
private void onRequestTakeOfferFault(Throwable throwable) {
log.debug("Run getPeerAddress again after onGetPeerAddressFault" + step);
RequestTakeOffer.run(this::onResultRequestTakeOffer, this::onFault, peer, tradeMessageService, tradeId);
}
@ -191,6 +209,9 @@ public class SellerTakesOfferProtocol {
// Incoming message from peer
///////////////////////////////////////////////////////////////////////////////////////////
// 3. PayTakeOfferFee
// Async
// In case of an error: Repeat once, then give up. No rollback activity needed
public void onRespondToTakeOfferRequestMessage(RespondToTakeOfferRequestMessage message) {
log.debug("onRespondToTakeOfferRequestMessage called " + step++);
log.debug("state " + state);
@ -266,7 +287,7 @@ public class SellerTakesOfferProtocol {
takeOfferFeeTxId,
accountId,
bankAccount,
peersMessagePublicKey,
offererMessagePublicKey,
messagePublicKey,
peersAccountId,
peersBankAccount,
@ -415,4 +436,8 @@ public class SellerTakesOfferProtocol {
listener.onFault(throwable, state);
}
private void onErrorMessage(String errorMessage) {
listener.onFault(new Exception(errorMessage), state);
}
}

View File

@ -17,22 +17,22 @@
package io.bitsquare.trade.protocol.trade.taker.messages;
import io.bitsquare.trade.protocol.trade.TradeMessage;
import io.bitsquare.trade.protocol.trade.OfferMessage;
import java.io.Serializable;
// That msg is used to ping the offerer if he is online and if the offer is still available
public class RequestIsOfferAvailableMessage implements Serializable, TradeMessage {
public class RequestIsOfferAvailableMessage implements Serializable, OfferMessage {
private static final long serialVersionUID = 4630151440192191798L;
private final String tradeId;
private final String offerId;
public RequestIsOfferAvailableMessage(String tradeId) {
this.tradeId = tradeId;
public RequestIsOfferAvailableMessage(String offerId) {
this.offerId = offerId;
}
@Override
public String getTradeId() {
return tradeId;
public String getOfferId() {
return offerId;
}

View File

@ -20,7 +20,7 @@ package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.util.handlers.ExceptionHandler;
import io.bitsquare.util.handlers.ErrorMessageHandler;
import java.security.PublicKey;
@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public class GetPeerAddress {
private static final Logger log = LoggerFactory.getLogger(GetPeerAddress.class);
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler,
public static void run(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler,
TradeMessageService tradeMessageService, PublicKey messagePublicKey) {
log.trace("Run GetPeerAddress task");
tradeMessageService.getPeerAddress(messagePublicKey, new GetPeerAddressListener() {
@ -43,7 +43,7 @@ public class GetPeerAddress {
@Override
public void onFailed() {
log.error("Lookup for peer address failed.");
exceptionHandler.handleException(new Exception("Lookup for peer address failed."));
errorMessageHandler.handleErrorMessage("Lookup for peer address failed.");
}
});
}

View File

@ -0,0 +1,52 @@
/*
* This file is part of Bitsquare.
*
* Bitsquare is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bitsquare is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bitsquare. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestIsOfferAvailableMessage;
import io.bitsquare.util.handlers.ErrorMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RequestIsOfferAvailable {
private static final Logger log = LoggerFactory.getLogger(RequestIsOfferAvailable.class);
public static void run(ErrorMessageHandler errorMessageHandler,
Peer peer, TradeMessageService tradeMessageService, String offerId) {
log.trace("Run RequestIsOfferAvailable task");
tradeMessageService.sendMessage(peer, new RequestIsOfferAvailableMessage(offerId),
new SendMessageListener() {
@Override
public void handleResult() {
log.trace("RequestIsOfferAvailableMessage successfully arrived at peer");
// nothing to do
}
@Override
public void handleFault() {
log.error("RequestIsOfferAvailableMessage did not arrive at peer");
errorMessageHandler.handleErrorMessage("RequestIsOfferAvailableMessage did not arrive at peer");
}
});
}
}

View File

@ -18,7 +18,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestTakeOfferMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -32,20 +32,19 @@ public class RequestTakeOffer {
public static void run(ResultHandler resultHandler, ExceptionHandler exceptionHandler, Peer peer,
TradeMessageService tradeMessageService, String tradeId) {
log.trace("Run task");
log.trace("Run RequestTakeOffer task");
tradeMessageService.sendMessage(peer, new RequestTakeOfferMessage(tradeId),
new OutgoingMessageListener() {
new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("RequestTakeOfferMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
log.error("RequestTakeOfferMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("RequestTakeOfferMessage did not arrive at " +
"peer"));
public void handleFault() {
log.error("RequestTakeOfferMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("RequestTakeOfferMessage did not arrive at peer"));
}
});
}

View File

@ -18,7 +18,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.PayoutTxPublishedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -34,15 +34,15 @@ public class SendPayoutTxToOfferer {
TradeMessageService tradeMessageService, String tradeId, String payoutTxAsHex) {
log.trace("Run task");
PayoutTxPublishedMessage tradeMessage = new PayoutTxPublishedMessage(tradeId, payoutTxAsHex);
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("PayoutTxPublishedMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("PayoutTxPublishedMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("PayoutTxPublishedMessage did not arrive at peer"));
}

View File

@ -20,7 +20,7 @@ package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.bank.BankAccount;
import io.bitsquare.btc.WalletService;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestOffererPublishDepositTxMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -68,15 +68,15 @@ public class SendSignedTakerDepositTxAsHex {
walletService.getAddressInfoByTradeID(tradeId).getAddressString(),
takerTxOutIndex,
offererTxOutIndex);
tradeMessageService.sendMessage(peer, tradeMessage, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, tradeMessage, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("RequestOffererDepositPublicationMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("RequestOffererDepositPublicationMessage did not arrive at peer");
exceptionHandler.handleException(
new Exception("RequestOffererDepositPublicationMessage did not arrive at peer"));

View File

@ -18,7 +18,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.network.Peer;
import io.bitsquare.trade.protocol.trade.taker.messages.TakeOfferFeePayedMessage;
import io.bitsquare.util.handlers.ExceptionHandler;
@ -44,15 +44,15 @@ public class SendTakeOfferFeePayedTxId {
TakeOfferFeePayedMessage msg = new TakeOfferFeePayedMessage(tradeId, takeOfferFeeTxId, tradeAmount,
pubKeyForThatTradeAsHex);
tradeMessageService.sendMessage(peer, msg, new OutgoingMessageListener() {
tradeMessageService.sendMessage(peer, msg, new SendMessageListener() {
@Override
public void onResult() {
public void handleResult() {
log.trace("TakeOfferFeePayedMessage successfully arrived at peer");
resultHandler.handleResult();
}
@Override
public void onFailed() {
public void handleFault() {
log.error("TakeOfferFeePayedMessage did not arrive at peer");
exceptionHandler.handleException(new Exception("TakeOfferFeePayedMessage did not arrive at peer"));
}

View File

@ -23,8 +23,8 @@ import io.bitsquare.network.tomp2p.TomP2PNode;
import io.bitsquare.network.tomp2p.TomP2PPeer;
import io.bitsquare.trade.TradeMessageService;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.listeners.IncomingMessageListener;
import io.bitsquare.trade.listeners.OutgoingMessageListener;
import io.bitsquare.trade.listeners.HandleNewMessageListener;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.user.User;
import java.security.PublicKey;
@ -58,7 +58,7 @@ public class TomP2PTradeMessageService implements TradeMessageService {
private final TomP2PNode tomP2PNode;
private final User user;
private final List<IncomingMessageListener> incomingMessageListeners = new ArrayList<>();
private final List<HandleNewMessageListener> handleNewMessageListeners = new ArrayList<>();
private Executor executor;
@ -70,8 +70,8 @@ public class TomP2PTradeMessageService implements TradeMessageService {
this.user = user;
this.tomP2PNode = tomP2PNode;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
@ -101,11 +101,10 @@ public class TomP2PTradeMessageService implements TradeMessageService {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Trade process
// Trade messages
///////////////////////////////////////////////////////////////////////////////////////////
public void sendMessage(Peer peer, Message message,
OutgoingMessageListener listener) {
public void sendMessage(Peer peer, Message message, SendMessageListener listener) {
if (!(peer instanceof TomP2PPeer)) {
throw new IllegalArgumentException("peer must be of type TomP2PPeer");
}
@ -114,17 +113,17 @@ public class TomP2PTradeMessageService implements TradeMessageService {
@Override
public void operationComplete(BaseFuture future) throws Exception {
if (future.isSuccess()) {
executor.execute(listener::onResult);
executor.execute(listener::handleResult);
}
else {
log.error("sendMessage failed with reason " + futureDirect.failedReason());
executor.execute(listener::onFailed);
executor.execute(listener::handleFault);
}
}
@Override
public void exceptionCaught(Throwable t) throws Exception {
executor.execute(listener::onFailed);
executor.execute(listener::handleFault);
}
});
}
@ -134,12 +133,12 @@ public class TomP2PTradeMessageService implements TradeMessageService {
// Event Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.add(listener);
public void addHandleNewMessageListener(HandleNewMessageListener listener) {
handleNewMessageListeners.add(listener);
}
public void removeIncomingMessageListener(IncomingMessageListener listener) {
incomingMessageListeners.remove(listener);
public void removeHandleNewMessageListener(HandleNewMessageListener listener) {
handleNewMessageListeners.remove(listener);
}
@ -149,9 +148,9 @@ public class TomP2PTradeMessageService implements TradeMessageService {
@Override
public void handleMessage(Object message, Peer sender) {
if (message instanceof Message) {
executor.execute(() -> incomingMessageListeners.stream().forEach(e ->
e.onMessage((Message) message, sender)));
if (message instanceof Message && sender instanceof TomP2PPeer) {
executor.execute(() -> handleNewMessageListeners.stream().forEach(e ->
e.handleMessage((Message) message, sender)));
}
}
}