Remove offer creation recovery from CreateOfferCoordinator

Recovery was never fully implemented, and removing it dramatically
simplifies things. We can return to this with a proper analysis of
finite state machine libraries when the time comes.
This commit is contained in:
Chris Beams 2014-11-05 15:37:07 +01:00
parent 72423dce8a
commit aeaef72834
No known key found for this signature in database
GPG key ID: 3D214F8F5BC5ED73
2 changed files with 23 additions and 131 deletions

View file

@ -84,7 +84,6 @@ public class TradeManager {
//TODO store TakerAsSellerProtocol in trade //TODO store TakerAsSellerProtocol in trade
private final Map<String, SellerTakesOfferProtocol> takerAsSellerProtocolMap = new HashMap<>(); private final Map<String, SellerTakesOfferProtocol> takerAsSellerProtocolMap = new HashMap<>();
private final Map<String, BuyerAcceptsOfferProtocol> offererAsBuyerProtocolMap = new HashMap<>(); private final Map<String, BuyerAcceptsOfferProtocol> offererAsBuyerProtocolMap = new HashMap<>();
private final Map<String, CreateOfferCoordinator> createOfferCoordinatorMap = new HashMap<>();
private final ObservableMap<String, Offer> offers = FXCollections.observableHashMap(); private final ObservableMap<String, Offer> offers = FXCollections.observableHashMap();
private final ObservableMap<String, Trade> pendingTrades = FXCollections.observableHashMap(); private final ObservableMap<String, Trade> pendingTrades = FXCollections.observableHashMap();
@ -167,36 +166,23 @@ public class TradeManager {
settings.getAcceptedCountries(), settings.getAcceptedCountries(),
settings.getAcceptedLanguageLocales()); settings.getAcceptedLanguageLocales());
if (createOfferCoordinatorMap.containsKey(offer.getId())) { CreateOfferCoordinator createOfferCoordinator = new CreateOfferCoordinator(
errorMessageHandler.handleErrorMessage("A createOfferCoordinator for the offer with the id " + offer offer,
.getId() + " " + walletFacade,
"already exists."); (transactionId) -> {
} try {
else { offer.setOfferFeePaymentTxID(transactionId.getHashAsString());
CreateOfferCoordinator createOfferCoordinator = new CreateOfferCoordinator(persistence, addOffer(offer);
offer, resultHandler.onResult(transactionId);
walletFacade, } catch (Exception e) {
(transactionId) -> { errorMessageHandler.handleErrorMessage("Could not save offer. Reason: " +
try { (e.getCause() != null ? e.getCause().getMessage() : e.toString()));
offer.setOfferFeePaymentTxID(transactionId.getHashAsString()); }
addOffer(offer); },
createOfferCoordinatorMap.remove(offer.getId()); (message, throwable) -> errorMessageHandler.handleErrorMessage(message),
offerRepository);
resultHandler.onResult(transactionId); createOfferCoordinator.start();
} catch (Exception e) {
//TODO retry policy
errorMessageHandler.handleErrorMessage("Could not save offer. Reason: " +
(e.getCause() != null ? e.getCause().getMessage() : e.toString()));
createOfferCoordinatorMap.remove(offer.getId());
}
},
(message, throwable) -> {
errorMessageHandler.handleErrorMessage(message);
createOfferCoordinatorMap.remove(offer.getId());
}, offerRepository);
createOfferCoordinatorMap.put(offer.getId(), createOfferCoordinator);
createOfferCoordinator.start();
}
} }
private void addOffer(Offer offer) { private void addOffer(Offer offer) {

View file

@ -20,7 +20,6 @@ package io.bitsquare.trade.protocol.createoffer;
import io.bitsquare.btc.WalletFacade; import io.bitsquare.btc.WalletFacade;
import io.bitsquare.offer.Offer; import io.bitsquare.offer.Offer;
import io.bitsquare.offer.OfferRepository; import io.bitsquare.offer.OfferRepository;
import io.bitsquare.persistence.Persistence;
import io.bitsquare.trade.handlers.TransactionResultHandler; import io.bitsquare.trade.handlers.TransactionResultHandler;
import io.bitsquare.util.task.FaultHandler; import io.bitsquare.util.task.FaultHandler;
@ -29,57 +28,13 @@ import org.bitcoinj.core.Transaction;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import java.io.Serializable;
import javax.annotation.concurrent.Immutable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Responsible for coordinating tasks involved in the create offer process. * Responsible for coordinating tasks involved in the create offer process.
* It holds the model.state of the current process and support recovery if possible.
*/ */
//TODO recover policy, timer
@Immutable
public class CreateOfferCoordinator { public class CreateOfferCoordinator {
public enum State {
INITED,
STARTED,
VALIDATED,
OFFER_FEE_TX_CREATED,
OFFER_FEE_BROAD_CASTED,
OFFER_PUBLISHED_TO_DHT
}
/**
* The model is not immutable but only exposed to the CreateOfferCoordinator
*/
static class Model implements Serializable {
private static final long serialVersionUID = 3027720554200858916L;
private final Persistence persistence;
private State state;
//TODO use tx id
Transaction transaction;
Model(Persistence persistence) {
this.persistence = persistence;
}
public State getState() {
return state;
}
public void setState(State state) {
this.state = state;
//TODO will have performance issues, but could be handled inside the persistence solution (queue up save
// requests and exec. them on dedicated thread)
persistence.write(this, "state", state);
}
}
private static final Logger log = LoggerFactory.getLogger(CreateOfferCoordinator.class); private static final Logger log = LoggerFactory.getLogger(CreateOfferCoordinator.class);
@ -87,43 +42,30 @@ public class CreateOfferCoordinator {
private final WalletFacade walletFacade; private final WalletFacade walletFacade;
private final TransactionResultHandler resultHandler; private final TransactionResultHandler resultHandler;
private final FaultHandler faultHandler; private final FaultHandler faultHandler;
private final Model model;
private final OfferRepository offerRepository; private final OfferRepository offerRepository;
public CreateOfferCoordinator(Persistence persistence, Offer offer, WalletFacade walletFacade,
TransactionResultHandler resultHandler, FaultHandler faultHandler,
OfferRepository offerRepository) {
this(offer, walletFacade, resultHandler, faultHandler, new Model(persistence), offerRepository);
}
// for recovery from model
public CreateOfferCoordinator(Offer offer, WalletFacade walletFacade, TransactionResultHandler resultHandler, public CreateOfferCoordinator(Offer offer, WalletFacade walletFacade, TransactionResultHandler resultHandler,
FaultHandler faultHandler, Model model, OfferRepository offerRepository) { FaultHandler faultHandler, OfferRepository offerRepository) {
this.offer = offer; this.offer = offer;
this.walletFacade = walletFacade; this.walletFacade = walletFacade;
this.resultHandler = resultHandler; this.resultHandler = resultHandler;
this.faultHandler = faultHandler; this.faultHandler = faultHandler;
this.model = model;
this.offerRepository = offerRepository; this.offerRepository = offerRepository;
model.setState(State.INITED);
} }
public void start() { public void start() {
model.setState(State.STARTED);
try { try {
offer.validate(); offer.validate();
model.setState(State.VALIDATED);
} catch (Exception ex) { } catch (Exception ex) {
faultHandler.handleFault("Offer validation failed", ex); faultHandler.handleFault("Offer validation failed", ex);
return; return;
} }
Transaction transaction;
try { try {
model.transaction = walletFacade.createOfferFeeTx(offer.getId()); transaction = walletFacade.createOfferFeeTx(offer.getId());
model.setState(State.OFFER_FEE_TX_CREATED); offer.setOfferFeePaymentTxID(transaction.getHashAsString());
offer.setOfferFeePaymentTxID(model.transaction.getHashAsString());
} catch (InsufficientMoneyException ex) { } catch (InsufficientMoneyException ex) {
faultHandler.handleFault( faultHandler.handleFault(
"Offer fee payment failed because there is insufficient money in the trade wallet", ex); "Offer fee payment failed because there is insufficient money in the trade wallet", ex);
@ -134,7 +76,7 @@ public class CreateOfferCoordinator {
} }
try { try {
walletFacade.broadcastCreateOfferFeeTx(model.transaction, new FutureCallback<Transaction>() { walletFacade.broadcastCreateOfferFeeTx(transaction, new FutureCallback<Transaction>() {
@Override @Override
public void onSuccess(Transaction transaction) { public void onSuccess(Transaction transaction) {
log.info("sendResult onSuccess:" + transaction); log.info("sendResult onSuccess:" + transaction);
@ -145,8 +87,7 @@ public class CreateOfferCoordinator {
} }
try { try {
model.setState(State.OFFER_FEE_BROAD_CASTED); offerRepository.addOffer(offer, () -> resultHandler.onResult(transaction), faultHandler);
offerRepository.addOffer(offer, CreateOfferCoordinator.this::addOfferResultHandler, faultHandler);
} catch (Exception e) { } catch (Exception e) {
faultHandler.handleFault("Offer fee payment failed.", e); faultHandler.handleFault("Offer fee payment failed.", e);
} }
@ -162,39 +103,4 @@ public class CreateOfferCoordinator {
return; return;
} }
} }
private void addOfferResultHandler() {
model.setState(State.OFFER_PUBLISHED_TO_DHT);
resultHandler.onResult(model.transaction);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Recovery
///////////////////////////////////////////////////////////////////////////////////////////
public void recover() {
switch (model.getState()) {
case INITED:
case STARTED:
case VALIDATED:
case OFFER_FEE_TX_CREATED:
// we start over again, no critical and expensive work done yet
start();
break;
case OFFER_FEE_BROAD_CASTED:
// actually the only replay case here, tx publish was successful but storage to dht failed.
// Republish the offer to DHT
offerRepository.addOffer(offer, this::addOfferResultHandler, faultHandler);
break;
case OFFER_PUBLISHED_TO_DHT:
// should be impossible
log.warn("That case must not happen.");
break;
default:
log.error("Illegal state passes. That must not happen");
break;
}
}
} }