message handlers return synchronously

This commit is contained in:
woodser 2022-11-26 17:24:55 +00:00
parent a266fab6ec
commit 7062fa9e79
2 changed files with 199 additions and 209 deletions

View File

@ -227,19 +227,21 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
@Override @Override
public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) { public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope(); NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
if (networkEnvelope instanceof InitTradeRequest) { new Thread(() -> {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer); if (networkEnvelope instanceof InitTradeRequest) {
} else if (networkEnvelope instanceof InitMultisigRequest) { handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
handleInitMultisigRequest((InitMultisigRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof InitMultisigRequest) {
} else if (networkEnvelope instanceof SignContractRequest) { handleInitMultisigRequest((InitMultisigRequest) networkEnvelope, peer);
handleSignContractRequest((SignContractRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof SignContractRequest) {
} else if (networkEnvelope instanceof SignContractResponse) { handleSignContractRequest((SignContractRequest) networkEnvelope, peer);
handleSignContractResponse((SignContractResponse) networkEnvelope, peer); } else if (networkEnvelope instanceof SignContractResponse) {
} else if (networkEnvelope instanceof DepositRequest) { handleSignContractResponse((SignContractResponse) networkEnvelope, peer);
handleDepositRequest((DepositRequest) networkEnvelope, peer); } else if (networkEnvelope instanceof DepositRequest) {
} else if (networkEnvelope instanceof DepositResponse) { handleDepositRequest((DepositRequest) networkEnvelope, peer);
handleDepositResponse((DepositResponse) networkEnvelope, peer); } else if (networkEnvelope instanceof DepositResponse) {
} handleDepositResponse((DepositResponse) networkEnvelope, peer);
}
}).start();
} }

View File

@ -104,24 +104,24 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { protected void onTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); log.info("Received {} as TradeMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
if (message instanceof DepositsConfirmedMessage) { handle(message, peerNodeAddress);
handle((DepositsConfirmedMessage) message, peerNodeAddress);
} else if (message instanceof PaymentSentMessage) {
handle((PaymentSentMessage) message, peerNodeAddress);
} else if (message instanceof PaymentReceivedMessage) {
handle((PaymentReceivedMessage) message, peerNodeAddress);
}
} }
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
if (message instanceof DepositsConfirmedMessage) { handle(message, peerNodeAddress);
handle((DepositsConfirmedMessage) message, peerNodeAddress); }
} else if (message instanceof PaymentSentMessage) {
handle((PaymentSentMessage) message, peerNodeAddress); private void handle(TradeMessage message, NodeAddress peerNodeAddress) {
} else if (message instanceof PaymentReceivedMessage) { new Thread(() -> {
handle((PaymentReceivedMessage) message, peerNodeAddress); if (message instanceof DepositsConfirmedMessage) {
} handle((DepositsConfirmedMessage) message, peerNodeAddress);
} else if (message instanceof PaymentSentMessage) {
handle((PaymentSentMessage) message, peerNodeAddress);
} else if (message instanceof PaymentReceivedMessage) {
handle((PaymentReceivedMessage) message, peerNodeAddress);
}
}).start();
} }
@Override @Override
@ -242,7 +242,9 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// handle trade events // handle trade events
EasyBind.subscribe(trade.stateProperty(), state -> { EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN) sendDepositsConfirmedMessage(); if (state == Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN) {
new Thread(() -> sendDepositsConfirmedMessage()).start();
}
}); });
// initialize trade // initialize trade
@ -256,155 +258,145 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) { public void handleInitMultisigRequest(InitMultisigRequest request, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()"); System.out.println(getClass().getSimpleName() + ".handleInitMultisigRequest()");
new Thread(() -> { synchronized (trade) {
synchronized (trade) { latchTrade();
latchTrade(); Validator.checkTradeId(processModel.getOfferId(), request);
Validator.checkTradeId(processModel.getOfferId(), request); processModel.setTradeMessage(request);
processModel.setTradeMessage(request); expect(anyPhase(Trade.Phase.INIT)
expect(anyPhase(Trade.Phase.INIT) .with(request)
.with(request) .from(sender))
.from(sender)) .setup(tasks(
.setup(tasks( ProcessInitMultisigRequest.class,
ProcessInitMultisigRequest.class, MaybeSendSignContractRequest.class)
MaybeSendSignContractRequest.class) .using(new TradeTaskRunner(trade,
.using(new TradeTaskRunner(trade, () -> {
() -> { startTimeout(TRADE_TIMEOUT);
startTimeout(TRADE_TIMEOUT); handleTaskRunnerSuccess(sender, request);
handleTaskRunnerSuccess(sender, request); },
}, errorMessage -> {
errorMessage -> { handleTaskRunnerFault(sender, request, errorMessage);
handleTaskRunnerFault(sender, request, errorMessage); }))
})) .withTimeout(TRADE_TIMEOUT))
.withTimeout(TRADE_TIMEOUT)) .executeTasks(true);
.executeTasks(true); awaitTradeLatch();
awaitTradeLatch(); }
}
}).start();
} }
public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) { public void handleSignContractRequest(SignContractRequest message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId()); System.out.println(getClass().getSimpleName() + ".handleSignContractRequest() " + trade.getId());
new Thread(() -> { synchronized (trade) {
synchronized (trade) { Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message); Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == Trade.State.MULTISIG_COMPLETED || trade.getState() == Trade.State.CONTRACT_SIGNATURE_REQUESTED) { processModel.setTradeMessage(message);
latchTrade(); expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED)
Validator.checkTradeId(processModel.getOfferId(), message); .with(message)
processModel.setTradeMessage(message); .from(sender))
expect(anyState(Trade.State.MULTISIG_COMPLETED, Trade.State.CONTRACT_SIGNATURE_REQUESTED) .setup(tasks(
.with(message) // TODO (woodser): validate request
.from(sender)) ProcessSignContractRequest.class)
.setup(tasks( .using(new TradeTaskRunner(trade,
// TODO (woodser): validate request () -> {
ProcessSignContractRequest.class) startTimeout(TRADE_TIMEOUT);
.using(new TradeTaskRunner(trade, handleTaskRunnerSuccess(sender, message);
() -> { },
startTimeout(TRADE_TIMEOUT); errorMessage -> {
handleTaskRunnerSuccess(sender, message); handleTaskRunnerFault(sender, message, errorMessage);
}, }))
errorMessage -> { .withTimeout(TRADE_TIMEOUT)) // extend timeout
handleTaskRunnerFault(sender, message, errorMessage); .executeTasks(true);
})) awaitTradeLatch();
.withTimeout(TRADE_TIMEOUT)) // extend timeout } else {
.executeTasks(true); // process sign contract request after multisig created
awaitTradeLatch(); EasyBind.subscribe(trade.stateProperty(), state -> {
} else { if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock
// process sign contract request after multisig created });
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.MULTISIG_COMPLETED) new Thread(() -> handleSignContractRequest(message, sender)).start(); // process notification without trade lock
});
}
} }
}).start(); }
} }
public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) { public void handleSignContractResponse(SignContractResponse message, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId()); System.out.println(getClass().getSimpleName() + ".handleSignContractResponse() " + trade.getId());
new Thread(() -> { synchronized (trade) {
synchronized (trade) { Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == Trade.State.CONTRACT_SIGNED) {
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message); Validator.checkTradeId(processModel.getOfferId(), message);
if (trade.getState() == Trade.State.CONTRACT_SIGNED) { processModel.setTradeMessage(message);
latchTrade(); expect(state(Trade.State.CONTRACT_SIGNED)
Validator.checkTradeId(processModel.getOfferId(), message); .with(message)
processModel.setTradeMessage(message); .from(sender))
expect(state(Trade.State.CONTRACT_SIGNED) .setup(tasks(
.with(message) // TODO (woodser): validate request
.from(sender)) ProcessSignContractResponse.class,
.setup(tasks( RemoveOffer.class)
// TODO (woodser): validate request .using(new TradeTaskRunner(trade,
ProcessSignContractResponse.class, () -> {
RemoveOffer.class) startTimeout(TRADE_TIMEOUT);
.using(new TradeTaskRunner(trade, handleTaskRunnerSuccess(sender, message);
() -> { },
startTimeout(TRADE_TIMEOUT); errorMessage -> {
handleTaskRunnerSuccess(sender, message); handleTaskRunnerFault(sender, message, errorMessage);
}, }))
errorMessage -> { .withTimeout(TRADE_TIMEOUT)) // extend timeout
handleTaskRunnerFault(sender, message, errorMessage); .executeTasks(true);
})) awaitTradeLatch();
.withTimeout(TRADE_TIMEOUT)) // extend timeout } else {
.executeTasks(true); // process sign contract response after contract signed
awaitTradeLatch(); EasyBind.subscribe(trade.stateProperty(), state -> {
} else { if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
// process sign contract response after contract signed });
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.CONTRACT_SIGNED) new Thread(() -> handleSignContractResponse(message, sender)).start(); // process notification without trade lock
});
}
} }
}).start(); }
} }
public void handleDepositResponse(DepositResponse response, NodeAddress sender) { public void handleDepositResponse(DepositResponse response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handleDepositResponse()"); System.out.println(getClass().getSimpleName() + ".handleDepositResponse()");
new Thread(() -> { synchronized (trade) {
synchronized (trade) { latchTrade();
latchTrade(); Validator.checkTradeId(processModel.getOfferId(), response);
Validator.checkTradeId(processModel.getOfferId(), response); processModel.setTradeMessage(response);
processModel.setTradeMessage(response); expect(anyState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK)
expect(anyState(Trade.State.SAW_ARRIVED_PUBLISH_DEPOSIT_TX_REQUEST, Trade.State.ARBITRATOR_PUBLISHED_DEPOSIT_TXS, Trade.State.DEPOSIT_TXS_SEEN_IN_NETWORK) .with(response)
.with(response) .from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress()
.from(sender)) // TODO (woodser): ensure this asserts sender == response.getSenderNodeAddress() .setup(tasks(
.setup(tasks( // TODO (woodser): validate request
// TODO (woodser): validate request ProcessDepositResponse.class)
ProcessDepositResponse.class) .using(new TradeTaskRunner(trade,
.using(new TradeTaskRunner(trade, () -> {
() -> { stopTimeout();
stopTimeout(); this.errorMessageHandler = null;
this.errorMessageHandler = null; handleTaskRunnerSuccess(sender, response);
handleTaskRunnerSuccess(sender, response); if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized
if (tradeResultHandler != null) tradeResultHandler.handleResult(trade); // trade is initialized },
}, errorMessage -> {
errorMessage -> { handleTaskRunnerFault(sender, response, errorMessage);
handleTaskRunnerFault(sender, response, errorMessage); }))
})) .withTimeout(TRADE_TIMEOUT))
.withTimeout(TRADE_TIMEOUT)) .executeTasks(true);
.executeTasks(true); awaitTradeLatch();
awaitTradeLatch(); }
}
}).start();
} }
public void handle(DepositsConfirmedMessage response, NodeAddress sender) { public void handle(DepositsConfirmedMessage response, NodeAddress sender) {
System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)"); System.out.println(getClass().getSimpleName() + ".handle(DepositsConfirmedMessage)");
new Thread(() -> { synchronized (trade) {
synchronized (trade) { latchTrade();
latchTrade(); expect(new Condition(trade)
expect(new Condition(trade) .with(response)
.with(response) .from(sender))
.from(sender)) .setup(tasks(ProcessDepositsConfirmedMessage.class)
.setup(tasks(ProcessDepositsConfirmedMessage.class) .using(new TradeTaskRunner(trade,
.using(new TradeTaskRunner(trade, () -> {
() -> { handleTaskRunnerSuccess(sender, response);
handleTaskRunnerSuccess(sender, response); },
}, errorMessage -> {
errorMessage -> { handleTaskRunnerFault(sender, response, errorMessage);
handleTaskRunnerFault(sender, response, errorMessage); })))
}))) .executeTasks();
.executeTasks(); awaitTradeLatch();
awaitTradeLatch(); }
}
}).start();
} }
// received by seller and arbitrator // received by seller and arbitrator
@ -414,43 +406,41 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
log.warn("Ignoring PaymentSentMessage since not seller or arbitrator"); log.warn("Ignoring PaymentSentMessage since not seller or arbitrator");
return; return;
} }
new Thread(() -> { // We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case
// We are more tolerant with expected phase and allow also DEPOSITS_PUBLISHED as it can be the case // that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received
// that the wallet is still syncing and so the DEPOSITS_CONFIRMED state to yet triggered when we received // a mailbox message with PaymentSentMessage.
// a mailbox message with PaymentSentMessage. // TODO A better fix would be to add a listener for the wallet sync state and process
// TODO A better fix would be to add a listener for the wallet sync state and process // the mailbox msg once wallet is ready and trade state set.
// the mailbox msg once wallet is ready and trade state set. synchronized (trade) {
synchronized (trade) { if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) { log.warn("Ignoring PaymentSentMessage which was already processed");
log.warn("Ignoring PaymentSentMessage which was already processed"); return;
return;
}
latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(
ApplyFilter.class,
ProcessPaymentSentMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
handleTaskRunnerFault(peer, message, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
} }
}).start(); latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.setup(tasks(
ApplyFilter.class,
ProcessPaymentSentMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(peer, message);
},
(errorMessage) -> {
handleTaskRunnerFault(peer, message, errorMessage);
})))
.executeTasks(true);
awaitTradeLatch();
}
} }
// received by buyer and arbitrator // received by buyer and arbitrator
@ -761,21 +751,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
} }
private void sendDepositsConfirmedMessage() { private void sendDepositsConfirmedMessage() {
new Thread(() -> { synchronized (trade) {
synchronized (trade) { latchTrade();
latchTrade(); expect(new Condition(trade))
expect(new Condition(trade)) .setup(tasks(getDepositsConfirmedTasks())
.setup(tasks(getDepositsConfirmedTasks()) .using(new TradeTaskRunner(trade,
.using(new TradeTaskRunner(trade, () -> {
() -> { handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages");
handleTaskRunnerSuccess(null, null, "SendDepositsConfirmedMessages"); },
}, (errorMessage) -> {
(errorMessage) -> { handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage);
handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage); })))
}))) .executeTasks(true);
.executeTasks(true); awaitTradeLatch();
awaitTradeLatch(); }
}
}).start();
} }
} }