Add rollBackOnFault to all tasks

This commit is contained in:
Manfred Karrer 2015-03-14 09:37:52 +01:00
parent 61a925e816
commit 8e280b99d4
42 changed files with 190 additions and 83 deletions

View File

@ -55,7 +55,7 @@ public class GetPeerAddress extends Task<CheckOfferAvailabilityModel> {
}
@Override
protected void applyErrorState() {
protected void rollBackOnFault() {
model.getOffer().setState(Offer.State.OFFERER_OFFLINE);
}
}

View File

@ -51,7 +51,7 @@ public class ProcessReportOfferAvailabilityMessage extends Task<CheckOfferAvaila
}
@Override
protected void applyErrorState() {
protected void rollBackOnFault() {
model.getOffer().setState(Offer.State.AVAILABILITY_CHECK_FAILED);
}
}

View File

@ -52,7 +52,7 @@ public class RequestIsOfferAvailable extends Task<CheckOfferAvailabilityModel> {
}
@Override
protected void applyErrorState() {
protected void rollBackOnFault() {
if (model.getOffer().getState() != Offer.State.OFFERER_OFFLINE)
model.getOffer().setState(Offer.State.AVAILABILITY_CHECK_FAILED);
}

View File

@ -41,5 +41,7 @@ public class AddOfferToRemoteOfferBook extends Task<PlaceOfferModel> {
(message, throwable) -> {
failed(throwable);
});
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -103,7 +103,7 @@ public class BroadcastCreateOfferFeeTx extends Task<PlaceOfferModel> {
}
}
protected void applyErrorState() {
protected void rollBackOnFault() {
if (!removeOfferFailed && !addOfferFailed) {
// If broadcast fails we need to remove offer from offerbook
model.getOfferBookService().removeOffer(model.getOffer(),

View File

@ -46,5 +46,7 @@ public class CreateOfferFeeTx extends Task<PlaceOfferModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -40,5 +40,7 @@ public class ValidateOffer extends Task<PlaceOfferModel> {
} catch (Exception e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -78,31 +78,6 @@ public class BuyerAsOffererProtocol {
// Incoming message handling
///////////////////////////////////////////////////////////////////////////////////////////
private void handleMessage(Message message, Peer peer) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName());
if (message instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) message;
nonEmptyStringOf(tradeMessage.getTradeId());
if (tradeMessage instanceof RequestTakeOfferMessage) {
handleRequestTakeOfferMessage((RequestTakeOfferMessage) tradeMessage, peer);
}
else if (tradeMessage instanceof TakeOfferFeePayedMessage) {
handleTakeOfferFeePayedMessage((TakeOfferFeePayedMessage) tradeMessage);
}
else if (tradeMessage instanceof RequestOffererPublishDepositTxMessage) {
handleRequestOffererPublishDepositTxMessage((RequestOffererPublishDepositTxMessage) tradeMessage);
}
else if (tradeMessage instanceof PayoutTxPublishedMessage) {
handlePayoutTxPublishedMessage((PayoutTxPublishedMessage) tradeMessage);
}
else {
log.error("Incoming tradeMessage not supported. " + tradeMessage);
}
}
}
private void handleRequestTakeOfferMessage(RequestTakeOfferMessage tradeMessage, Peer peer) {
model.setTradeMessage(tradeMessage);
model.setPeer(peer);
@ -205,4 +180,34 @@ public class BuyerAsOffererProtocol {
taskRunner.addTasks(ProcessPayoutTxPublishedMessage.class);
taskRunner.run();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Massage dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
private void handleMessage(Message message, Peer peer) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName());
if (message instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) message;
nonEmptyStringOf(tradeMessage.getTradeId());
if (tradeMessage instanceof RequestTakeOfferMessage) {
handleRequestTakeOfferMessage((RequestTakeOfferMessage) tradeMessage, peer);
}
else if (tradeMessage instanceof TakeOfferFeePayedMessage) {
handleTakeOfferFeePayedMessage((TakeOfferFeePayedMessage) tradeMessage);
}
else if (tradeMessage instanceof RequestOffererPublishDepositTxMessage) {
handleRequestOffererPublishDepositTxMessage((RequestOffererPublishDepositTxMessage) tradeMessage);
}
else if (tradeMessage instanceof PayoutTxPublishedMessage) {
handlePayoutTxPublishedMessage((PayoutTxPublishedMessage) tradeMessage);
}
else {
log.error("Incoming tradeMessage not supported. " + tradeMessage);
}
}
}
}

View File

@ -26,11 +26,11 @@ import org.jetbrains.annotations.NotNull;
public class RespondToTakeOfferRequestMessage implements Serializable, TradeMessage {
private static final long serialVersionUID = 6177387534087739018L;
private final String tradeId;
private final boolean takeOfferRequestAccepted;
private final boolean offerIsAvailable;
public RespondToTakeOfferRequestMessage(@NotNull String tradeId, boolean takeOfferRequestAccepted) {
public RespondToTakeOfferRequestMessage(@NotNull String tradeId, boolean offerIsAvailable) {
this.tradeId = tradeId;
this.takeOfferRequestAccepted = takeOfferRequestAccepted;
this.offerIsAvailable = offerIsAvailable;
}
@Override
@ -38,7 +38,7 @@ public class RespondToTakeOfferRequestMessage implements Serializable, TradeMess
return tradeId;
}
public boolean isTakeOfferRequestAccepted() {
return takeOfferRequestAccepted;
public boolean isOfferIsAvailable() {
return offerIsAvailable;
}
}

View File

@ -60,5 +60,7 @@ public class CreateDepositTx extends Task<BuyerAsOffererModel> {
} catch (InsufficientMoneyException e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -52,5 +52,7 @@ public class ProcessPayoutTxPublishedMessage extends Task<BuyerAsOffererModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -55,5 +55,7 @@ public class ProcessRequestOffererPublishDepositTxMessage extends Task<BuyerAsOf
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -42,5 +42,7 @@ public class ProcessRequestTakeOfferMessage extends Task<BuyerAsOffererModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -51,5 +51,7 @@ public class ProcessTakeOfferFeePayedMessage extends Task<BuyerAsOffererModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
public class RespondToTakeOfferRequest extends Task<BuyerAsOffererModel> {
private static final Logger log = LoggerFactory.getLogger(RespondToTakeOfferRequest.class);
private boolean offerIsAvailable;
public RespondToTakeOfferRequest(TaskRunner taskHandler, BuyerAsOffererModel model) {
super(taskHandler, model);
@ -37,27 +38,36 @@ public class RespondToTakeOfferRequest extends Task<BuyerAsOffererModel> {
@Override
protected void doRun() {
boolean takeOfferRequestAccepted = model.getOpenOffer().getState() == OpenOffer.State.OPEN;
if (!takeOfferRequestAccepted)
offerIsAvailable = model.getOpenOffer().getState() == OpenOffer.State.OPEN;
if (offerIsAvailable) {
model.getOpenOffer().setState(OpenOffer.State.OFFER_ACCEPTED);
Trade trade = new Trade(model.getOpenOffer().getOffer());
model.setTrade(trade);
}
else {
log.info("Received take offer request but the offer not marked as open anymore.");
}
Trade trade = new Trade(model.getOpenOffer().getOffer());
model.setTrade(trade);
model.getOpenOffer().setState(OpenOffer.State.OFFER_ACCEPTED);
RespondToTakeOfferRequestMessage tradeMessage = new RespondToTakeOfferRequestMessage(trade.getId(), takeOfferRequestAccepted);
RespondToTakeOfferRequestMessage tradeMessage = new RespondToTakeOfferRequestMessage(model.getOpenOffer().getId(), offerIsAvailable);
model.getTradeMessageService().sendMessage(model.getPeer(), tradeMessage, new SendMessageListener() {
@Override
public void handleResult() {
log.trace("RespondToTakeOfferRequestMessage successfully arrived at peer");
complete();
}
@Override
public void handleFault() {
failed("AcceptTakeOfferRequestMessage did not arrive at peer");
failed();
}
});
}
@Override
protected void rollBackOnFault() {
if (offerIsAvailable && model.getOpenOffer().getState() == OpenOffer.State.OFFER_ACCEPTED) {
model.getOpenOffer().setState(OpenOffer.State.OPEN);
model.setTrade(null);
}
}
}

View File

@ -55,5 +55,7 @@ public class SendBankTransferInitedMessage extends Task<BuyerAsOffererModel> {
failed("Sending BankTransferInitedMessage failed.");
}
});
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -52,5 +52,7 @@ public class SendDepositTxIdToTaker extends Task<BuyerAsOffererModel> {
failed("Sending DepositTxPublishedMessage failed.");
}
});
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -55,5 +55,7 @@ public class SendTakerDepositPaymentRequest extends Task<BuyerAsOffererModel> {
failed("RequestTakerDepositPaymentMessage did not arrive at peer");
}
});
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -54,5 +54,7 @@ public class SetupListenerForBlockChainConfirmation extends Task<BuyerAsOffererM
});
complete();
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -66,5 +66,7 @@ public class SignAndPublishDepositTx extends Task<BuyerAsOffererModel> {
} catch (Exception e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -61,6 +61,8 @@ public class SignPayoutTx extends Task<BuyerAsOffererModel> {
} catch (Exception e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -56,5 +56,7 @@ public class VerifyAndSignContract extends Task<BuyerAsOffererModel> {
trade.setTakerContractSignature(signature);
complete();
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -41,5 +41,7 @@ public class VerifyTakeOfferFeePayment extends Task<BuyerAsOffererModel> {
complete();
}
@Override
protected void rollBackOnFault() {
}
}

View File

@ -46,5 +46,7 @@ public class VerifyTakerAccount extends Task<BuyerAsOffererModel> {
else {
failed("Account registration validation for peer failed.");
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -68,6 +68,10 @@ public class SellerAsTakerProtocol {
// Called from UI
///////////////////////////////////////////////////////////////////////////////////////////
public void cleanup() {
model.getTradeMessageService().removeMessageHandler(messageHandler);
}
public void takeOffer() {
model.getTradeMessageService().addMessageHandler(messageHandler);
@ -86,39 +90,11 @@ public class SellerAsTakerProtocol {
taskRunner.run();
}
public void cleanup() {
model.getTradeMessageService().removeMessageHandler(messageHandler);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message handling
///////////////////////////////////////////////////////////////////////////////////////////
private void handleMessage(Message message, Peer sender) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName());
if (message instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) message;
nonEmptyStringOf(tradeMessage.getTradeId());
if (tradeMessage instanceof RespondToTakeOfferRequestMessage) {
handleRespondToTakeOfferRequestMessage((RespondToTakeOfferRequestMessage) tradeMessage);
}
else if (tradeMessage instanceof TakerDepositPaymentRequestMessage) {
handleTakerDepositPaymentRequestMessage((TakerDepositPaymentRequestMessage) tradeMessage);
}
else if (tradeMessage instanceof DepositTxPublishedMessage) {
handleDepositTxPublishedMessage((DepositTxPublishedMessage) tradeMessage);
}
else if (tradeMessage instanceof BankTransferStartedMessage) {
handleBankTransferInitedMessage((BankTransferStartedMessage) tradeMessage);
}
else {
log.error("Incoming message not supported. " + tradeMessage);
}
}
}
private void handleRespondToTakeOfferRequestMessage(RespondToTakeOfferRequestMessage tradeMessage) {
model.setTradeMessage(tradeMessage);
@ -214,4 +190,33 @@ public class SellerAsTakerProtocol {
);
taskRunner.run();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Massage dispatcher
///////////////////////////////////////////////////////////////////////////////////////////
private void handleMessage(Message message, Peer sender) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName());
if (message instanceof TradeMessage) {
TradeMessage tradeMessage = (TradeMessage) message;
nonEmptyStringOf(tradeMessage.getTradeId());
if (tradeMessage instanceof RespondToTakeOfferRequestMessage) {
handleRespondToTakeOfferRequestMessage((RespondToTakeOfferRequestMessage) tradeMessage);
}
else if (tradeMessage instanceof TakerDepositPaymentRequestMessage) {
handleTakerDepositPaymentRequestMessage((TakerDepositPaymentRequestMessage) tradeMessage);
}
else if (tradeMessage instanceof DepositTxPublishedMessage) {
handleDepositTxPublishedMessage((DepositTxPublishedMessage) tradeMessage);
}
else if (tradeMessage instanceof BankTransferStartedMessage) {
handleBankTransferInitedMessage((BankTransferStartedMessage) tradeMessage);
}
else {
log.error("Incoming message not supported. " + tradeMessage);
}
}
}
}

View File

@ -55,5 +55,7 @@ public class CreateAndSignContract extends Task<SellerAsTakerModel> {
trade.setTakerContractSignature(signature);
complete();
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -18,6 +18,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.network.Peer;
import io.bitsquare.offer.Offer;
import io.bitsquare.trade.listeners.GetPeerAddressListener;
import io.bitsquare.trade.protocol.trade.taker.SellerAsTakerModel;
import io.bitsquare.util.taskrunner.Task;
@ -31,23 +32,32 @@ public class GetPeerAddress extends Task<SellerAsTakerModel> {
public GetPeerAddress(TaskRunner taskHandler, SellerAsTakerModel model) {
super(taskHandler, model);
errorMessage = "DHT lookup for peer address failed. Maybe the offerer was offline for too long time.";
}
@Override
protected void doRun() {
model.getTradeMessageService().getPeerAddress(model.getOffererMessagePublicKey(), new GetPeerAddressListener() {
model.getTradeMessageService().getPeerAddress(model.getOffer().getMessagePublicKey(), new GetPeerAddressListener() {
@Override
public void onResult(Peer peer) {
log.trace("Found peer: " + peer.toString());
model.setPeer(peer);
complete();
}
@Override
public void onFailed() {
failed("DHT lookup for peer address failed.");
model.getOffer().setState(Offer.State.OFFERER_OFFLINE);
failed();
}
});
}
@Override
protected void rollBackOnFault() {
if (model.getOffer().getState() != Offer.State.OFFERER_OFFLINE)
model.getOffer().setState(Offer.State.AVAILABILITY_CHECK_FAILED);
}
}

View File

@ -55,5 +55,7 @@ public class PayDeposit extends Task<SellerAsTakerModel> {
} catch (InsufficientMoneyException e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -17,6 +17,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.protocol.trade.taker.SellerAsTakerModel;
import io.bitsquare.util.taskrunner.Task;
import io.bitsquare.util.taskrunner.TaskRunner;
@ -58,4 +59,10 @@ public class PayTakeOfferFee extends Task<SellerAsTakerModel> {
failed(e);
}
}
@Override
protected void rollBackOnFault() {
// in error case take offer can be repeated so we reset the trade state
model.getTrade().setState(Trade.State.OPEN);
}
}

View File

@ -52,5 +52,7 @@ public class ProcessBankTransferInitedMessage extends Task<SellerAsTakerModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -46,5 +46,7 @@ public class ProcessDepositTxPublishedMessage extends Task<SellerAsTakerModel> {
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -40,7 +40,7 @@ public class ProcessRespondToTakeOfferRequestMessage extends Task<SellerAsTakerM
try {
checkTradeId(model.getTrade().getId(), model.getTradeMessage());
if (((RespondToTakeOfferRequestMessage) model.getTradeMessage()).isTakeOfferRequestAccepted()) {
if (((RespondToTakeOfferRequestMessage) model.getTradeMessage()).isOfferIsAvailable()) {
model.getTrade().setState(Trade.State.OFFERER_ACCEPTED);
complete();
}
@ -49,7 +49,11 @@ public class ProcessRespondToTakeOfferRequestMessage extends Task<SellerAsTakerM
failed("Requested offer rejected because it is not available anymore.");
}
} catch (Throwable t) {
failed( t);
failed(t);
}
}
@Override
protected void rollBackOnFault() {
}
}

View File

@ -50,5 +50,7 @@ public class ProcessTakerDepositPaymentRequestMessage extends Task<SellerAsTaker
} catch (Throwable t) {
failed(t);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -17,6 +17,7 @@
package io.bitsquare.trade.protocol.trade.taker.tasks;
import io.bitsquare.offer.Offer;
import io.bitsquare.trade.listeners.SendMessageListener;
import io.bitsquare.trade.protocol.trade.taker.SellerAsTakerModel;
import io.bitsquare.trade.protocol.trade.taker.messages.RequestTakeOfferMessage;
@ -39,14 +40,20 @@ public class RequestTakeOffer extends Task<SellerAsTakerModel> {
new SendMessageListener() {
@Override
public void handleResult() {
log.trace("Sending RequestTakeOfferMessage succeeded.");
complete();
}
@Override
public void handleFault() {
failed("Sending RequestTakeOfferMessage failed.");
model.getOffer().setState(Offer.State.OFFERER_OFFLINE);
failed();
}
});
}
@Override
protected void rollBackOnFault() {
if (model.getOffer().getState() != Offer.State.OFFERER_OFFLINE)
model.getOffer().setState(Offer.State.AVAILABILITY_CHECK_FAILED);
}
}

View File

@ -48,5 +48,7 @@ public class SendPayoutTxToOfferer extends Task<SellerAsTakerModel> {
failed("Sending PayoutTxPublishedMessage failed.");
}
});
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -67,5 +67,7 @@ public class SendSignedTakerDepositTxAsHex extends Task<SellerAsTakerModel> {
}
});
}
@Override
protected void rollBackOnFault() {
}
}

View File

@ -51,8 +51,12 @@ public class SendTakeOfferFeePayedMessage extends Task<SellerAsTakerModel> {
@Override
public void handleFault() {
failed("Sending TakeOfferFeePayedMessage failed.");
failed();
}
});
}
@Override
protected void rollBackOnFault() {
}
}

View File

@ -71,5 +71,7 @@ public class SignAndPublishPayoutTx extends Task<SellerAsTakerModel> {
} catch (AddressFormatException e) {
failed(e);
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -42,5 +42,7 @@ public class TakerCommitDepositTx extends Task<SellerAsTakerModel> {
model.getTrade().setState(Trade.State.DEPOSIT_PUBLISHED);
complete();
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -39,5 +39,7 @@ public class VerifyOfferFeePayment extends Task<SellerAsTakerModel> {
resultHandler.handleResult();
}*/
complete();
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -44,5 +44,7 @@ public class VerifyOffererAccount extends Task<SellerAsTakerModel> {
else {
failed("Account registration validation for peer faultHandler.onFault.");
}
} @Override
protected void rollBackOnFault() {
}
}

View File

@ -47,8 +47,7 @@ public abstract class Task<T extends SharedModel> {
abstract protected void doRun();
protected void applyErrorState() {
}
abstract protected void rollBackOnFault();
private void interceptBeforeRun() {
if (getClass() == taskToInterceptBeforeRun)
@ -89,7 +88,7 @@ public abstract class Task<T extends SharedModel> {
}
protected void failed() {
applyErrorState();
rollBackOnFault();
taskHandler.handleErrorMessage(errorMessage);
}