synchronize open offers to fix concurrent modification exception

This commit is contained in:
woodser 2022-12-30 13:45:21 +00:00
parent 2c3dabfbf7
commit 9d3855ad2e
2 changed files with 64 additions and 26 deletions

View File

@ -128,7 +128,7 @@ public class CoreOffersService {
} }
List<Offer> getMyOffers() { List<Offer> getMyOffers() {
List<Offer> offers = new ArrayList<>(openOfferManager.getObservableList()).stream() List<Offer> offers = openOfferManager.getOpenOffers().stream()
.map(OpenOffer::getOffer) .map(OpenOffer::getOffer)
.filter(o -> o.isMyOffer(keyRing)) .filter(o -> o.isMyOffer(keyRing))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -444,8 +444,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer openOffer = new OpenOffer(offer, triggerPrice, autoSplit); OpenOffer openOffer = new OpenOffer(offer, triggerPrice, autoSplit);
// process open offer to schedule or post // process open offer to schedule or post
processUnpostedOffer(openOffer, (transaction) -> { processUnpostedOffer(getOpenOffers(), openOffer, (transaction) -> {
openOffers.add(openOffer); addOpenOffer(openOffer);
requestPersistence(); requestPersistence();
resultHandler.handleResult(transaction); resultHandler.handleResult(transaction);
}, (errMessage) -> { }, (errMessage) -> {
@ -550,12 +550,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
openOffer.getOffer().setState(Offer.State.REMOVED); openOffer.getOffer().setState(Offer.State.REMOVED);
openOffer.setState(OpenOffer.State.CANCELED); openOffer.setState(OpenOffer.State.CANCELED);
openOffers.remove(openOffer); removeOpenOffer(openOffer);
OpenOffer editedOpenOffer = new OpenOffer(editedOffer, triggerPrice); OpenOffer editedOpenOffer = new OpenOffer(editedOffer, triggerPrice);
editedOpenOffer.setState(originalState); editedOpenOffer.setState(originalState);
openOffers.add(editedOpenOffer); addOpenOffer(editedOpenOffer);
if (!editedOpenOffer.isDeactivated()) if (!editedOpenOffer.isDeactivated())
republishOffer(editedOpenOffer); republishOffer(editedOpenOffer);
@ -592,7 +592,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
offer.setState(Offer.State.REMOVED); offer.setState(Offer.State.REMOVED);
openOffer.setState(OpenOffer.State.CANCELED); openOffer.setState(OpenOffer.State.CANCELED);
openOffers.remove(openOffer); removeOpenOffer(openOffer);
closedTradableManager.add(openOffer); closedTradableManager.add(openOffer);
xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId()); xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId());
log.info("onRemoved offerId={}", offer.getId()); log.info("onRemoved offerId={}", offer.getId());
@ -602,7 +602,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// Close openOffer after deposit published // Close openOffer after deposit published
public void closeOpenOffer(Offer offer) { public void closeOpenOffer(Offer offer) {
getOpenOfferById(offer.getId()).ifPresent(openOffer -> { getOpenOfferById(offer.getId()).ifPresent(openOffer -> {
openOffers.remove(openOffer); removeOpenOffer(openOffer);
openOffer.setState(OpenOffer.State.CLOSED); openOffer.setState(OpenOffer.State.CLOSED);
xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId()); xmrWalletService.resetAddressEntriesForOpenOffer(offer.getId());
offerBookService.removeOffer(openOffer.getOffer().getOfferPayload(), offerBookService.removeOffer(openOffer.getOffer().getOfferPayload(),
@ -630,16 +630,50 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
return offer.isMyOffer(keyRing); return offer.isMyOffer(keyRing);
} }
public List<OpenOffer> getOpenOffers() {
synchronized (openOffers) {
return new ArrayList<>(getObservableList());
}
}
public List<SignedOffer> getSignedOffers() {
synchronized (signedOffers) {
return new ArrayList<>(signedOffers.getObservableList());
}
}
public ObservableList<OpenOffer> getObservableList() { public ObservableList<OpenOffer> getObservableList() {
return openOffers.getObservableList(); return openOffers.getObservableList();
} }
public Optional<OpenOffer> getOpenOfferById(String offerId) { public Optional<OpenOffer> getOpenOfferById(String offerId) {
return new ArrayList<>(openOffers.getObservableList()).stream().filter(e -> e.getId().equals(offerId)).findFirst(); synchronized (openOffers) {
return openOffers.stream().filter(e -> e.getId().equals(offerId)).findFirst();
}
} }
public Optional<SignedOffer> getSignedOfferById(String offerId) { public Optional<SignedOffer> getSignedOfferById(String offerId) {
return new ArrayList<>(signedOffers.getObservableList()).stream().filter(e -> e.getOfferId().equals(offerId)).findFirst(); synchronized (signedOffers) {
return signedOffers.stream().filter(e -> e.getOfferId().equals(offerId)).findFirst();
}
}
private void addOpenOffer(OpenOffer openOffer) {
synchronized (openOffers) {
openOffers.add(openOffer);
}
}
private void removeOpenOffer(OpenOffer openOffer) {
synchronized (openOffers) {
openOffers.remove(openOffer);
}
}
private void addSignedOffer(SignedOffer openOffer) {
synchronized (signedOffers) {
signedOffers.add(openOffer);
}
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -650,10 +684,11 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
new Thread(() -> { new Thread(() -> {
List<String> errorMessages = new ArrayList<String>(); List<String> errorMessages = new ArrayList<String>();
for (OpenOffer scheduledOffer : new ArrayList<OpenOffer>(openOffers.getObservableList())) { List<OpenOffer> openOffers = getOpenOffers();
for (OpenOffer scheduledOffer : openOffers) {
if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue; if (scheduledOffer.getState() != OpenOffer.State.SCHEDULED) continue;
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
processUnpostedOffer(scheduledOffer, (transaction) -> { processUnpostedOffer(openOffers, scheduledOffer, (transaction) -> {
latch.countDown(); latch.countDown();
}, errorMessage -> { }, errorMessage -> {
onRemoved(scheduledOffer); onRemoved(scheduledOffer);
@ -668,7 +703,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}).start(); }).start();
} }
private void processUnpostedOffer(OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { private void processUnpostedOffer(List<OpenOffer> openOffers, OpenOffer openOffer, TransactionResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
new Thread(() -> { new Thread(() -> {
try { try {
@ -703,7 +738,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
log.info("Scheduling offer " + openOffer.getId()); log.info("Scheduling offer " + openOffer.getId());
// check for sufficient balance - scheduled offers amount // check for sufficient balance - scheduled offers amount
if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount()).compareTo(offerReserveAmount) < 0) { if (xmrWalletService.getWallet().getBalance(0).subtract(getScheduledAmount(openOffers)).compareTo(offerReserveAmount) < 0) {
throw new RuntimeException("Not enough money in Haveno wallet"); throw new RuntimeException("Not enough money in Haveno wallet");
} }
@ -714,7 +749,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
List<String> scheduledTxHashes = new ArrayList<String>(); List<String> scheduledTxHashes = new ArrayList<String>();
BigInteger scheduledAmount = new BigInteger("0"); BigInteger scheduledAmount = new BigInteger("0");
for (MoneroTxWallet lockedTx : lockedTxs) { for (MoneroTxWallet lockedTx : lockedTxs) {
if (isTxScheduled(lockedTx.getHash())) continue; if (isTxScheduled(openOffers, lockedTx.getHash())) continue;
if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue; if (lockedTx.getIncomingTransfers() == null || lockedTx.getIncomingTransfers().isEmpty()) continue;
scheduledTxHashes.add(lockedTx.getHash()); scheduledTxHashes.add(lockedTx.getHash());
for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) { for (MoneroIncomingTransfer transfer : lockedTx.getIncomingTransfers()) {
@ -739,9 +774,9 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}).start(); }).start();
} }
private BigInteger getScheduledAmount() { private BigInteger getScheduledAmount(List<OpenOffer> openOffers) {
BigInteger scheduledAmount = new BigInteger("0"); BigInteger scheduledAmount = new BigInteger("0");
for (OpenOffer openOffer : openOffers.getObservableList()) { for (OpenOffer openOffer : openOffers) {
if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue; if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue;
if (openOffer.getScheduledTxHashes() == null) continue; if (openOffer.getScheduledTxHashes() == null) continue;
List<MoneroTxWallet> fundingTxs = xmrWalletService.getWallet().getTxs(openOffer.getScheduledTxHashes()); List<MoneroTxWallet> fundingTxs = xmrWalletService.getWallet().getTxs(openOffer.getScheduledTxHashes());
@ -754,8 +789,8 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
return scheduledAmount; return scheduledAmount;
} }
private boolean isTxScheduled(String txHash) { private boolean isTxScheduled(List<OpenOffer> openOffers, String txHash) {
for (OpenOffer openOffer : openOffers.getObservableList()) { for (OpenOffer openOffer : openOffers) {
if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue; if (openOffer.getState() != OpenOffer.State.SCHEDULED) continue;
if (openOffer.getScheduledTxHashes() == null) continue; if (openOffer.getScheduledTxHashes() == null) continue;
for (String scheduledTxHash : openOffer.getScheduledTxHashes()) { for (String scheduledTxHash : openOffer.getScheduledTxHashes()) {
@ -888,7 +923,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// create record of signed offer // create record of signed offer
SignedOffer signedOffer = new SignedOffer(signedOfferPayload.getId(), request.getReserveTxHash(), request.getReserveTxHex(), signature); // TODO (woodser): no need for signature to be part of SignedOffer? SignedOffer signedOffer = new SignedOffer(signedOfferPayload.getId(), request.getReserveTxHash(), request.getReserveTxHex(), signature); // TODO (woodser): no need for signature to be part of SignedOffer?
signedOffers.add(signedOffer); addSignedOffer(signedOffer);
requestPersistence(); requestPersistence();
// send response with signature // send response with signature
@ -1133,7 +1168,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void maybeUpdatePersistedOffers() { private void maybeUpdatePersistedOffers() {
// We need to clone to avoid ConcurrentModificationException // We need to clone to avoid ConcurrentModificationException
ArrayList<OpenOffer> openOffersClone = new ArrayList<>(openOffers.getList()); List<OpenOffer> openOffersClone = getOpenOffers();
openOffersClone.forEach(originalOpenOffer -> { openOffersClone.forEach(originalOpenOffer -> {
Offer originalOffer = originalOpenOffer.getOffer(); Offer originalOffer = originalOpenOffer.getOffer();
@ -1227,7 +1262,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
// remove old offer // remove old offer
originalOffer.setState(Offer.State.REMOVED); originalOffer.setState(Offer.State.REMOVED);
originalOpenOffer.setState(OpenOffer.State.CANCELED); originalOpenOffer.setState(OpenOffer.State.CANCELED);
openOffers.remove(originalOpenOffer); removeOpenOffer(originalOpenOffer);
// Create new Offer // Create new Offer
Offer updatedOffer = new Offer(updatedPayload); Offer updatedOffer = new Offer(updatedPayload);
@ -1236,7 +1271,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, originalOpenOffer.getTriggerPrice()); OpenOffer updatedOpenOffer = new OpenOffer(updatedOffer, originalOpenOffer.getTriggerPrice());
updatedOpenOffer.setState(originalOpenOfferState); updatedOpenOffer.setState(originalOpenOfferState);
openOffers.add(updatedOpenOffer); addOpenOffer(updatedOpenOffer);
requestPersistence(); requestPersistence();
log.info("Updating offer completed. id={}", originalOffer.getId()); log.info("Updating offer completed. id={}", originalOffer.getId());
@ -1256,8 +1291,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopPeriodicRefreshOffersTimer(); stopPeriodicRefreshOffersTimer();
List<OpenOffer> openOffersList = new ArrayList<>(openOffers.getList()); processListForRepublishOffers(getOpenOffers());
processListForRepublishOffers(openOffersList);
} }
private void processListForRepublishOffers(List<OpenOffer> list) { private void processListForRepublishOffers(List<OpenOffer> list) {
@ -1266,13 +1300,17 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
OpenOffer openOffer = list.remove(0); OpenOffer openOffer = list.remove(0);
if (openOffers.contains(openOffer) && !openOffer.isDeactivated()) { boolean contained = false;
synchronized (openOffers) {
contained = openOffers.contains(openOffer);
}
if (contained && !openOffer.isDeactivated()) {
// TODO It is not clear yet if it is better for the node and the network to send out all add offer // TODO It is not clear yet if it is better for the node and the network to send out all add offer
// messages in one go or to spread it over a delay. With power users who have 100-200 offers that can have // messages in one go or to spread it over a delay. With power users who have 100-200 offers that can have
// some significant impact to user experience and the network // some significant impact to user experience and the network
republishOffer(openOffer, () -> processListForRepublishOffers(list)); republishOffer(openOffer, () -> processListForRepublishOffers(list));
/* republishOffer(openOffer, /* republishOffer(openOffer,
() -> UserThread.runAfter(() -> processListForRepublishOffers(list), () -> UserThread.runAfter(() -> processListForRepublishOffers(list),
30, TimeUnit.MILLISECONDS));*/ 30, TimeUnit.MILLISECONDS));*/
} else { } else {