improve stability on tor, refactor startup and shut down

refactor startup sequence to improve message reliability
refactor shut down sequence to finish processing messages
reduce monerod requests to improve slow tor connections
refactor trade wallet polling
monero node service uses default data directory unless local
connections service checks connection by polling daemon
connections service supports getRefreshPeriodMs and shutting down
add make config: monerod-stagenet-custom
fix bugs in key image polling
force stop wallet on deletion
trade manager initializes persisted trades on data received
support hardcoding monero log level and request stack traces
remove xmrAddress from Arbitrator model
fix formatting of MoneroWalletRpcManager
This commit is contained in:
woodser 2023-04-17 08:54:10 -04:00
parent 5e364f7e7e
commit 2afa5d761d
51 changed files with 1058 additions and 644 deletions

View file

@ -52,7 +52,7 @@ import haveno.core.trade.protocol.tasks.ProcessPaymentSentMessage;
import haveno.core.trade.protocol.tasks.ProcessSignContractRequest;
import haveno.core.trade.protocol.tasks.ProcessSignContractResponse;
import haveno.core.trade.protocol.tasks.RemoveOffer;
import haveno.core.trade.protocol.tasks.ResendDisputeClosedMessageWithPayout;
import haveno.core.trade.protocol.tasks.MaybeResendDisputeClosedMessageWithPayout;
import haveno.core.trade.protocol.tasks.TradeTask;
import haveno.core.trade.protocol.tasks.VerifyPeersAccountAgeWitness;
import haveno.core.util.Validator;
@ -242,32 +242,19 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
protected void onInitialized() {
if (!trade.isCompleted()) {
processModel.getP2PService().addDecryptedDirectMessageListener(this);
}
// initialize trade
trade.initialize(processModel.getProvider());
// listen for direct messages unless completed
if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this);
// initialize trade with lock
synchronized (trade) {
trade.initialize(processModel.getProvider());
}
// process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
mailboxMessageService.addDecryptedMailboxListener(this);
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
// send deposit confirmed message on startup or event
if (trade.getState().ordinal() >= Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN.ordinal()) {
new Thread(() -> sendDepositsConfirmedMessages()).start();
} else {
EasyBind.subscribe(trade.stateProperty(), state -> {
if (state == Trade.State.DEPOSIT_TXS_CONFIRMED_IN_BLOCKCHAIN) {
new Thread(() -> sendDepositsConfirmedMessages()).start();
}
});
}
// reprocess payout messages if pending
maybeReprocessPaymentReceivedMessage(true);
HavenoUtils.arbitrationManager.maybeReprocessDisputeClosedMessage(trade, true);
}
public void maybeReprocessPaymentReceivedMessage(boolean reprocessOnError) {
@ -448,7 +435,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
.setup(tasks(
ProcessDepositsConfirmedMessage.class,
VerifyPeersAccountAgeWitness.class,
ResendDisputeClosedMessageWithPayout.class)
MaybeResendDisputeClosedMessageWithPayout.class)
.using(new TradeTaskRunner(trade,
() -> {
handleTaskRunnerSuccess(sender, response);
@ -475,7 +462,8 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// the mailbox msg once wallet is ready and trade state set.
synchronized (trade) {
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_SENT.ordinal()) {
log.warn("Ignoring PaymentSentMessage which was already processed");
log.warn("Received another PaymentSentMessage which was already processed, ACKing");
handleTaskRunnerSuccess(peer, message);
return;
}
latchTrade();
@ -518,6 +506,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
return;
}
synchronized (trade) {
if (trade.getPhase().ordinal() >= Trade.Phase.PAYMENT_RECEIVED.ordinal()) {
log.warn("Received another PaymentReceivedMessage which was already processed, ACKing");
handleTaskRunnerSuccess(peer, message);
return;
}
latchTrade();
Validator.checkTradeId(processModel.getOfferId(), message);
processModel.setTradeMessage(message);
@ -844,7 +837,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
}
}
private void sendDepositsConfirmedMessages() {
public void maybeSendDepositsConfirmedMessages() {
synchronized (trade) {
if (!trade.isInitialized()) return; // skip if shutting down
if (trade.getProcessModel().isDepositsConfirmedMessagesDelivered()) return; // skip if already delivered
@ -860,7 +853,7 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
// retry in 15 minutes
UserThread.runAfter(() -> {
sendDepositsConfirmedMessages();
maybeSendDepositsConfirmedMessages();
}, 15, TimeUnit.MINUTES);
handleTaskRunnerFault(null, null, "SendDepositsConfirmedMessages", errorMessage);
})))