republish offers on dedicated thread for open offer manager

This commit is contained in:
woodser 2023-12-21 07:38:19 -05:00
parent a1f8f942fc
commit 0d33959eeb

View File

@ -1571,9 +1571,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer(); stopPeriodicRefreshOffersTimer();
new Thread(() -> { HavenoUtils.submitToThread(() -> {
processListForRepublishOffers(getOpenOffers()); processListForRepublishOffers(getOpenOffers());
}).start(); }, THREAD_ID);
} }
private void processListForRepublishOffers(List<OpenOffer> list) { private void processListForRepublishOffers(List<OpenOffer> list) {
@ -1607,53 +1607,53 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) { private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) {
HavenoUtils.submitToThread(() -> {
// determine if offer is valid // determine if offer is valid
boolean isValid = true; boolean isValid = true;
Arbitrator arbitrator = user.getAcceptedArbitratorByAddress(openOffer.getOffer().getOfferPayload().getArbitratorSigner()); Arbitrator arbitrator = user.getAcceptedArbitratorByAddress(openOffer.getOffer().getOfferPayload().getArbitratorSigner());
if (arbitrator == null || !HavenoUtils.isArbitratorSignatureValid(openOffer.getOffer().getOfferPayload(), arbitrator)) { if (arbitrator == null || !HavenoUtils.isArbitratorSignatureValid(openOffer.getOffer().getOfferPayload(), arbitrator)) {
log.warn("Offer {} has invalid arbitrator signature, reposting", openOffer.getId()); log.warn("Offer {} has invalid arbitrator signature, reposting", openOffer.getId());
isValid = false; isValid = false;
} }
if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null && (openOffer.getReserveTxHash() == null || openOffer.getReserveTxHash().isEmpty())) { if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null && (openOffer.getReserveTxHash() == null || openOffer.getReserveTxHash().isEmpty())) {
log.warn("Offer {} is missing reserve tx hash but has reserved key images, reposting", openOffer.getId()); log.warn("Offer {} is missing reserve tx hash but has reserved key images, reposting", openOffer.getId());
isValid = false; isValid = false;
} }
// if valid, re-add offer to book // if valid, re-add offer to book
if (isValid) { if (isValid) {
offerBookService.addOffer(openOffer.getOffer(), offerBookService.addOffer(openOffer.getOffer(),
() -> { () -> {
if (!stopped) { if (!stopped) {
// refresh means we send only the data needed to refresh the TTL (hash, signature and sequence no.) // refresh means we send only the data needed to refresh the TTL (hash, signature and sequence no.)
if (periodicRefreshOffersTimer == null) { if (periodicRefreshOffersTimer == null) {
startPeriodicRefreshOffersTimer(); startPeriodicRefreshOffersTimer();
}
if (completeHandler != null) {
completeHandler.run();
}
} }
if (completeHandler != null) { },
completeHandler.run(); errorMessage -> {
if (!stopped) {
log.error("Adding offer to P2P network failed. " + errorMessage);
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
if (completeHandler != null) completeHandler.run();
} }
} });
}, } else {
errorMessage -> {
if (!stopped) {
log.error("Adding offer to P2P network failed. " + errorMessage);
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
if (completeHandler != null) completeHandler.run();
}
});
} else {
// cancel and recreate offer // cancel and recreate offer
onCancelled(openOffer); onCancelled(openOffer);
Offer updatedOffer = new Offer(openOffer.getOffer().getOfferPayload()); Offer updatedOffer = new Offer(openOffer.getOffer().getOfferPayload());
updatedOffer.setPriceFeedService(priceFeedService); updatedOffer.setPriceFeedService(priceFeedService);
OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, openOffer.getTriggerPrice()); OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, openOffer.getTriggerPrice());
// repost offer // repost offer
HavenoUtils.submitToThread(() -> {
synchronized (processOffersLock) { synchronized (processOffersLock) {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(getOpenOffers(), updatedOpenOffer, (transaction) -> { processUnpostedOffer(getOpenOffers(), updatedOpenOffer, (transaction) -> {
@ -1670,8 +1670,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}); });
HavenoUtils.awaitLatch(latch); HavenoUtils.awaitLatch(latch);
} }
}, THREAD_ID); }
} }, THREAD_ID);
} }
private void startPeriodicRepublishOffersTimer() { private void startPeriodicRepublishOffersTimer() {