fix concurrency issues by synchronizing on base persistable list

This commit is contained in:
woodser 2025-04-06 17:43:23 -04:00 committed by woodser
parent 3c6914ac7e
commit 9027ce6634
15 changed files with 262 additions and 220 deletions

View file

@ -35,6 +35,8 @@
package haveno.core.offer; package haveno.core.offer;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import haveno.common.ThreadUtils; import haveno.common.ThreadUtils;
import haveno.common.Timer; import haveno.common.Timer;
@ -261,7 +263,10 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
private void cleanUpAddressEntries() { private void cleanUpAddressEntries() {
Set<String> openOffersIdSet = openOffers.getList().stream().map(OpenOffer::getId).collect(Collectors.toSet()); Set<String> openOffersIdSet;
synchronized (openOffers.getList()) {
openOffersIdSet = openOffers.getList().stream().map(OpenOffer::getId).collect(Collectors.toSet());
}
xmrWalletService.getAddressEntriesForOpenOffer().stream() xmrWalletService.getAddressEntriesForOpenOffer().stream()
.filter(e -> !openOffersIdSet.contains(e.getOfferId())) .filter(e -> !openOffersIdSet.contains(e.getOfferId()))
.forEach(e -> { .forEach(e -> {
@ -292,7 +297,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
ThreadUtils.execute(() -> { ThreadUtils.execute(() -> {
// remove offers from offer book // remove offers from offer book
synchronized (openOffers) { synchronized (openOffers.getList()) {
openOffers.forEach(openOffer -> { openOffers.forEach(openOffer -> {
if (openOffer.getState() == OpenOffer.State.AVAILABLE) { if (openOffer.getState() == OpenOffer.State.AVAILABLE) {
offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()); offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload());
@ -334,15 +339,17 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
private void removeOpenOffers(List<OpenOffer> openOffers, @Nullable Runnable completeHandler) { private void removeOpenOffers(List<OpenOffer> openOffers, @Nullable Runnable completeHandler) {
int size = openOffers.size(); synchronized (openOffers) {
// Copy list as we remove in the loop int size = openOffers.size();
List<OpenOffer> openOffersList = new ArrayList<>(openOffers); // Copy list as we remove in the loop
openOffersList.forEach(openOffer -> cancelOpenOffer(openOffer, () -> { List<OpenOffer> openOffersList = new ArrayList<>(openOffers);
}, errorMessage -> { openOffersList.forEach(openOffer -> cancelOpenOffer(openOffer, () -> {
log.warn("Error removing open offer: " + errorMessage); }, errorMessage -> {
})); log.warn("Error removing open offer: " + errorMessage);
if (completeHandler != null) }));
UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS); if (completeHandler != null)
UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS);
}
} }
@ -450,13 +457,17 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
}); });
// poll spent status of open offer key images // poll spent status of open offer key images
for (OpenOffer openOffer : getOpenOffers()) { synchronized (openOffers.getList()) {
xmrConnectionService.getKeyImagePoller().addKeyImages(openOffer.getOffer().getOfferPayload().getReserveTxKeyImages(), OPEN_OFFER_GROUP_KEY_IMAGE_ID); for (OpenOffer openOffer : openOffers.getList()) {
xmrConnectionService.getKeyImagePoller().addKeyImages(openOffer.getOffer().getOfferPayload().getReserveTxKeyImages(), OPEN_OFFER_GROUP_KEY_IMAGE_ID);
}
} }
// poll spent status of signed offer key images // poll spent status of signed offer key images
for (SignedOffer signedOffer : signedOffers.getList()) { synchronized (signedOffers.getList()) {
xmrConnectionService.getKeyImagePoller().addKeyImages(signedOffer.getReserveTxKeyImages(), SIGNED_OFFER_KEY_IMAGE_GROUP_ID); for (SignedOffer signedOffer : signedOffers.getList()) {
xmrConnectionService.getKeyImagePoller().addKeyImages(signedOffer.getReserveTxKeyImages(), SIGNED_OFFER_KEY_IMAGE_GROUP_ID);
}
} }
}, THREAD_ID); }, THREAD_ID);
}); });
@ -858,29 +869,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
public boolean hasAvailableOpenOffers() { public boolean hasAvailableOpenOffers() {
synchronized (openOffers) { for (OpenOffer openOffer : getOpenOffers()) {
for (OpenOffer openOffer : getOpenOffers()) { if (openOffer.getState() == OpenOffer.State.AVAILABLE) {
if (openOffer.getState() == OpenOffer.State.AVAILABLE) { return true;
return true;
}
} }
return false;
} }
return false;
} }
public List<OpenOffer> getOpenOffers() { public List<OpenOffer> getOpenOffers() {
synchronized (openOffers) { synchronized (openOffers.getList()) {
return new ArrayList<>(getObservableList()); return ImmutableList.copyOf(getObservableList());
} }
} }
public List<OpenOffer> getOpenOfferGroup(String groupId) { public List<OpenOffer> getOpenOfferGroup(String groupId) {
if (groupId == null) throw new IllegalArgumentException("groupId cannot be null"); if (groupId == null) throw new IllegalArgumentException("groupId cannot be null");
synchronized (openOffers) { return getOpenOffers().stream()
return getOpenOffers().stream() .filter(openOffer -> groupId.equals(openOffer.getGroupId()))
.filter(openOffer -> groupId.equals(openOffer.getGroupId())) .collect(Collectors.toList());
.collect(Collectors.toList());
}
} }
public boolean hasClonedOffer(String offerId) { public boolean hasClonedOffer(String offerId) {
@ -890,24 +897,22 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
public boolean hasClonedOffers() { public boolean hasClonedOffers() {
synchronized (openOffers) { for (OpenOffer openOffer : getOpenOffers()) {
for (OpenOffer openOffer : getOpenOffers()) { if (getOpenOfferGroup(openOffer.getGroupId()).size() > 1) {
if (getOpenOfferGroup(openOffer.getGroupId()).size() > 1) { return true;
return true;
}
} }
return false;
} }
return false;
} }
public List<SignedOffer> getSignedOffers() { public List<SignedOffer> getSignedOffers() {
synchronized (signedOffers) { synchronized (signedOffers.getList()) {
return new ArrayList<>(signedOffers.getObservableList()); return ImmutableList.copyOf(signedOffers.getObservableList());
} }
} }
public ObservableList<SignedOffer> getObservableSignedOffersList() { public ObservableList<SignedOffer> getObservableSignedOffersList() {
synchronized (signedOffers) { synchronized (signedOffers.getList()) {
return signedOffers.getObservableList(); return signedOffers.getObservableList();
} }
} }
@ -917,9 +922,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
public Optional<OpenOffer> getOpenOffer(String offerId) { public Optional<OpenOffer> getOpenOffer(String offerId) {
synchronized (openOffers) { return getOpenOffers().stream().filter(e -> e.getId().equals(offerId)).findFirst();
return openOffers.stream().filter(e -> e.getId().equals(offerId)).findFirst();
}
} }
public boolean hasOpenOffer(String offerId) { public boolean hasOpenOffer(String offerId) {
@ -927,14 +930,12 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
public Optional<SignedOffer> getSignedOfferById(String offerId) { public Optional<SignedOffer> getSignedOfferById(String offerId) {
synchronized (signedOffers) { return getSignedOffers().stream().filter(e -> e.getOfferId().equals(offerId)).findFirst();
return signedOffers.stream().filter(e -> e.getOfferId().equals(offerId)).findFirst();
}
} }
private void addOpenOffer(OpenOffer openOffer) { private void addOpenOffer(OpenOffer openOffer) {
log.info("Adding open offer {}", openOffer.getId()); log.info("Adding open offer {}", openOffer.getId());
synchronized (openOffers) { synchronized (openOffers.getList()) {
openOffers.add(openOffer); openOffers.add(openOffer);
} }
if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null) { if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null) {
@ -944,7 +945,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void removeOpenOffer(OpenOffer openOffer) { private void removeOpenOffer(OpenOffer openOffer) {
log.info("Removing open offer {}", openOffer.getId()); log.info("Removing open offer {}", openOffer.getId());
synchronized (openOffers) { synchronized (openOffers.getList()) {
openOffers.remove(openOffer); openOffers.remove(openOffer);
} }
synchronized (placeOfferProtocols) { synchronized (placeOfferProtocols) {
@ -957,17 +958,19 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
private void cancelOpenOffersOnSpent(String keyImage) { private void cancelOpenOffersOnSpent(String keyImage) {
for (OpenOffer openOffer : getOpenOffers()) { synchronized (openOffers.getList()) {
if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null && openOffer.getOffer().getOfferPayload().getReserveTxKeyImages().contains(keyImage)) { for (OpenOffer openOffer : openOffers.getList()) {
log.warn("Canceling open offer because reserved funds have been spent, offerId={}, state={}", openOffer.getId(), openOffer.getState()); if (openOffer.getOffer().getOfferPayload().getReserveTxKeyImages() != null && openOffer.getOffer().getOfferPayload().getReserveTxKeyImages().contains(keyImage)) {
cancelOpenOffer(openOffer, null, null); log.warn("Canceling open offer because reserved funds have been spent, offerId={}, state={}", openOffer.getId(), openOffer.getState());
cancelOpenOffer(openOffer, null, null);
}
} }
} }
} }
private void addSignedOffer(SignedOffer signedOffer) { private void addSignedOffer(SignedOffer signedOffer) {
log.info("Adding SignedOffer for offer {}", signedOffer.getOfferId()); log.info("Adding SignedOffer for offer {}", signedOffer.getOfferId());
synchronized (signedOffers) { synchronized (signedOffers.getList()) {
// remove signed offers with common key images // remove signed offers with common key images
for (String keyImage : signedOffer.getReserveTxKeyImages()) { for (String keyImage : signedOffer.getReserveTxKeyImages()) {
@ -982,16 +985,18 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private void removeSignedOffer(SignedOffer signedOffer) { private void removeSignedOffer(SignedOffer signedOffer) {
log.info("Removing SignedOffer for offer {}", signedOffer.getOfferId()); log.info("Removing SignedOffer for offer {}", signedOffer.getOfferId());
synchronized (signedOffers) { synchronized (signedOffers.getList()) {
signedOffers.remove(signedOffer); signedOffers.remove(signedOffer);
xmrConnectionService.getKeyImagePoller().removeKeyImages(signedOffer.getReserveTxKeyImages(), SIGNED_OFFER_KEY_IMAGE_GROUP_ID);
} }
xmrConnectionService.getKeyImagePoller().removeKeyImages(signedOffer.getReserveTxKeyImages(), SIGNED_OFFER_KEY_IMAGE_GROUP_ID);
} }
private void removeSignedOffers(String keyImage) { private void removeSignedOffers(String keyImage) {
for (SignedOffer signedOffer : new ArrayList<SignedOffer>(signedOffers.getList())) { synchronized (signedOffers.getList()) {
if (signedOffer.getReserveTxKeyImages().contains(keyImage)) { for (SignedOffer signedOffer : getSignedOffers()) {
removeSignedOffer(signedOffer); if (signedOffer.getReserveTxKeyImages().contains(keyImage)) {
removeSignedOffer(signedOffer);
}
} }
} }
} }
@ -2070,7 +2075,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
OpenOffer openOffer = list.remove(0); OpenOffer openOffer = list.remove(0);
boolean contained = false; boolean contained = false;
synchronized (openOffers) { synchronized (openOffers.getList()) {
contained = openOffers.contains(openOffer); contained = openOffers.contains(openOffer);
} }
if (contained) { if (contained) {
@ -2171,7 +2176,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
if (periodicRefreshOffersTimer == null) if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(() -> { periodicRefreshOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) { if (!stopped) {
synchronized (openOffers) { synchronized (openOffers.getList()) {
int size = openOffers.size(); int size = openOffers.size();
//we clone our list as openOffers might change during our delayed call //we clone our list as openOffers might change during our delayed call
final ArrayList<OpenOffer> openOffersList = new ArrayList<>(openOffers.getList()); final ArrayList<OpenOffer> openOffersList = new ArrayList<>(openOffers.getList());
@ -2186,7 +2191,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
UserThread.runAfterRandomDelay(() -> { UserThread.runAfterRandomDelay(() -> {
// we need to check if in the meantime the offer has been removed // we need to check if in the meantime the offer has been removed
boolean contained = false; boolean contained = false;
synchronized (openOffers) { synchronized (openOffers.getList()) {
contained = openOffers.contains(openOffer); contained = openOffers.contains(openOffer);
} }
if (contained) maybeRefreshOffer(openOffer, 0, 1); if (contained) maybeRefreshOffer(openOffer, 0, 1);

View file

@ -47,10 +47,12 @@ public final class SignedOfferList extends PersistableListAsObservable<SignedOff
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder() synchronized (getList()) {
.setSignedOfferList(protobuf.SignedOfferList.newBuilder() return protobuf.PersistableEnvelope.newBuilder()
.addAllSignedOffer(ProtoUtil.collectionToProto(getList(), protobuf.SignedOffer.class))) .setSignedOfferList(protobuf.SignedOfferList.newBuilder()
.build(); .addAllSignedOffer(ProtoUtil.collectionToProto(getList(), protobuf.SignedOffer.class)))
.build();
}
} }
public static SignedOfferList fromProto(protobuf.SignedOfferList proto) { public static SignedOfferList fromProto(protobuf.SignedOfferList proto) {

View file

@ -36,10 +36,12 @@ public class PaymentAccountList extends PersistableList<PaymentAccount> {
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder() synchronized (getList()) {
.setPaymentAccountList(protobuf.PaymentAccountList.newBuilder() return protobuf.PersistableEnvelope.newBuilder()
.addAllPaymentAccount(getList().stream().map(PaymentAccount::toProtoMessage).collect(Collectors.toList()))) .setPaymentAccountList(protobuf.PaymentAccountList.newBuilder()
.build(); .addAllPaymentAccount(getList().stream().map(PaymentAccount::toProtoMessage).collect(Collectors.toList())))
.build();
}
} }
public static PaymentAccountList fromProto(protobuf.PaymentAccountList proto, CoreProtoResolver coreProtoResolver) { public static PaymentAccountList fromProto(protobuf.PaymentAccountList proto, CoreProtoResolver coreProtoResolver) {

View file

@ -74,10 +74,12 @@ public abstract class DisputeListService<T extends DisputeList<Dispute>> impleme
@Override @Override
public void readPersisted(Runnable completeHandler) { public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(getFileName(), persisted -> { persistenceManager.readPersisted(getFileName(), persisted -> {
disputeList.setAll(persisted.getList()); synchronized (persisted.getList()) {
completeHandler.run(); disputeList.setAll(persisted.getList());
}, }
completeHandler); completeHandler.run();
},
completeHandler);
} }
protected String getFileName() { protected String getFileName() {
@ -145,26 +147,30 @@ public abstract class DisputeListService<T extends DisputeList<Dispute>> impleme
private void onDisputesChangeListener(List<? extends Dispute> addedList, private void onDisputesChangeListener(List<? extends Dispute> addedList,
@Nullable List<? extends Dispute> removedList) { @Nullable List<? extends Dispute> removedList) {
if (removedList != null) { if (removedList != null) {
removedList.forEach(dispute -> { synchronized (removedList) {
disputedTradeIds.remove(dispute.getTradeId()); removedList.forEach(dispute -> {
disputedTradeIds.remove(dispute.getTradeId());
});
}
}
synchronized (addedList) {
addedList.forEach(dispute -> {
// for each dispute added, keep track of its "BadgeCountProperty"
EasyBind.subscribe(dispute.getBadgeCountProperty(),
isAlerting -> {
// We get the event before the list gets updated, so we execute on next frame
UserThread.execute(() -> {
synchronized (disputeList.getObservableList()) {
int numAlerts = (int) disputeList.getList().stream()
.mapToLong(x -> x.getBadgeCountProperty().getValue())
.sum();
numOpenDisputes.set(numAlerts);
}
});
});
disputedTradeIds.add(dispute.getTradeId());
}); });
} }
addedList.forEach(dispute -> {
// for each dispute added, keep track of its "BadgeCountProperty"
EasyBind.subscribe(dispute.getBadgeCountProperty(),
isAlerting -> {
// We get the event before the list gets updated, so we execute on next frame
UserThread.execute(() -> {
synchronized (disputeList.getObservableList()) {
int numAlerts = (int) disputeList.getList().stream()
.mapToLong(x -> x.getBadgeCountProperty().getValue())
.sum();
numOpenDisputes.set(numAlerts);
}
});
});
disputedTradeIds.add(dispute.getTradeId());
});
} }
public void requestPersistence() { public void requestPersistence() {

View file

@ -288,14 +288,16 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
cleanupDisputes(); cleanupDisputes();
List<Dispute> disputes = getDisputeList().getList(); List<Dispute> disputes = getDisputeList().getList();
disputes.forEach(dispute -> { synchronized (disputes) {
try { disputes.forEach(dispute -> {
DisputeValidation.validateNodeAddresses(dispute, config); try {
} catch (DisputeValidation.ValidationException e) { DisputeValidation.validateNodeAddresses(dispute, config);
log.error(e.toString()); } catch (DisputeValidation.ValidationException e) {
validationExceptions.add(e); log.error(e.toString());
} validationExceptions.add(e);
}); }
});
}
maybeClearSensitiveData(); maybeClearSensitiveData();
} }
@ -318,11 +320,13 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
public void maybeClearSensitiveData() { public void maybeClearSensitiveData() {
log.info("{} checking closed disputes eligibility for having sensitive data cleared", super.getClass().getSimpleName()); log.info("{} checking closed disputes eligibility for having sensitive data cleared", super.getClass().getSimpleName());
Instant safeDate = closedTradableManager.getSafeDateForSensitiveDataClearing(); Instant safeDate = closedTradableManager.getSafeDateForSensitiveDataClearing();
getDisputeList().getList().stream() synchronized (getDisputeList().getList()) {
.filter(e -> e.isClosed()) getDisputeList().getList().stream()
.filter(e -> e.getOpeningDate().toInstant().isBefore(safeDate)) .filter(e -> e.isClosed())
.forEach(Dispute::maybeClearSensitiveData); .filter(e -> e.getOpeningDate().toInstant().isBefore(safeDate))
requestPersistence(); .forEach(Dispute::maybeClearSensitiveData);
requestPersistence();
}
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -176,18 +176,20 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
// remove disputes opened by arbitrator, which is not allowed // remove disputes opened by arbitrator, which is not allowed
Set<Dispute> toRemoves = new HashSet<>(); Set<Dispute> toRemoves = new HashSet<>();
List<Dispute> disputes = getDisputeList().getList(); List<Dispute> disputes = getDisputeList().getList();
for (Dispute dispute : disputes) { synchronized (disputes) {
for (Dispute dispute : disputes) {
// get dispute's trade // get dispute's trade
final Trade trade = tradeManager.getTrade(dispute.getTradeId()); final Trade trade = tradeManager.getTrade(dispute.getTradeId());
if (trade == null) { if (trade == null) {
log.warn("Dispute trade {} does not exist", dispute.getTradeId()); log.warn("Dispute trade {} does not exist", dispute.getTradeId());
return; return;
} }
// collect dispute if owned by arbitrator // collect dispute if owned by arbitrator
if (dispute.getTraderPubKeyRing().equals(trade.getArbitrator().getPubKeyRing())) { if (dispute.getTraderPubKeyRing().equals(trade.getArbitrator().getPubKeyRing())) {
toRemoves.add(dispute); toRemoves.add(dispute);
}
} }
} }
for (Dispute toRemove : toRemoves) { for (Dispute toRemove : toRemoves) {

View file

@ -55,8 +55,10 @@ public final class MediationDisputeList extends DisputeList<Dispute> {
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder().setMediationDisputeList(protobuf.MediationDisputeList.newBuilder() synchronized (getList()) {
.addAllDispute(ProtoUtil.collectionToProto(getList(), protobuf.Dispute.class))).build(); return protobuf.PersistableEnvelope.newBuilder().setMediationDisputeList(protobuf.MediationDisputeList.newBuilder()
.addAllDispute(ProtoUtil.collectionToProto(getList(), protobuf.Dispute.class))).build();
}
} }
public static MediationDisputeList fromProto(protobuf.MediationDisputeList proto, public static MediationDisputeList fromProto(protobuf.MediationDisputeList proto,

View file

@ -58,9 +58,10 @@ public final class RefundDisputeList extends DisputeList<Dispute> {
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
forEach(dispute -> checkArgument(dispute.getSupportType().equals(SupportType.REFUND), "Support type has to be REFUND")); forEach(dispute -> checkArgument(dispute.getSupportType().equals(SupportType.REFUND), "Support type has to be REFUND"));
synchronized (getList()) {
return protobuf.PersistableEnvelope.newBuilder().setRefundDisputeList(protobuf.RefundDisputeList.newBuilder() return protobuf.PersistableEnvelope.newBuilder().setRefundDisputeList(protobuf.RefundDisputeList.newBuilder()
.addAllDispute(ProtoUtil.collectionToProto(getList(), protobuf.Dispute.class))).build(); .addAllDispute(ProtoUtil.collectionToProto(getList(), protobuf.Dispute.class))).build();
}
} }
public static RefundDisputeList fromProto(protobuf.RefundDisputeList proto, public static RefundDisputeList fromProto(protobuf.RefundDisputeList proto,

View file

@ -55,21 +55,23 @@ public class CleanupMailboxMessages {
} }
public void handleTrades(List<Trade> trades) { public void handleTrades(List<Trade> trades) {
// We wrap in a try catch as in failed trades we cannot be sure if expected data is set, so we could get synchronized (trades) {
// a NullPointer and do not want that this escalate to the user. // We wrap in a try catch as in failed trades we cannot be sure if expected data is set, so we could get
try { // a NullPointer and do not want that this escalate to the user.
if (p2PService.isBootstrapped()) { try {
cleanupMailboxMessages(trades); if (p2PService.isBootstrapped()) {
} else { cleanupMailboxMessages(trades);
p2PService.addP2PServiceListener(new BootstrapListener() { } else {
@Override p2PService.addP2PServiceListener(new BootstrapListener() {
public void onDataReceived() { @Override
cleanupMailboxMessages(trades); public void onDataReceived() {
} cleanupMailboxMessages(trades);
}); }
});
}
} catch (Throwable t) {
log.error("Cleanup mailbox messages failed. {}", t.toString());
} }
} catch (Throwable t) {
log.error("Cleanup mailbox messages failed. {}", t.toString());
} }
} }

View file

@ -81,13 +81,15 @@ public class ClosedTradableManager implements PersistedDataHost {
@Override @Override
public void readPersisted(Runnable completeHandler) { public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> { persistenceManager.readPersisted(persisted -> {
closedTradables.setAll(persisted.getList()); synchronized (persisted.getList()) {
closedTradables.stream() closedTradables.setAll(persisted.getList());
.filter(tradable -> tradable.getOffer() != null) closedTradables.stream()
.forEach(tradable -> tradable.getOffer().setPriceFeedService(priceFeedService)); .filter(tradable -> tradable.getOffer() != null)
completeHandler.run(); .forEach(tradable -> tradable.getOffer().setPriceFeedService(priceFeedService));
}, }
completeHandler); completeHandler.run();
},
completeHandler);
} }
public void onAllServicesInitialized() { public void onAllServicesInitialized() {
@ -96,7 +98,7 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public void add(Tradable tradable) { public void add(Tradable tradable) {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
if (closedTradables.add(tradable)) { if (closedTradables.add(tradable)) {
maybeClearSensitiveData(); maybeClearSensitiveData();
requestPersistence(); requestPersistence();
@ -105,7 +107,7 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public void remove(Tradable tradable) { public void remove(Tradable tradable) {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
if (closedTradables.remove(tradable)) { if (closedTradables.remove(tradable)) {
requestPersistence(); requestPersistence();
} }
@ -117,17 +119,17 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public ObservableList<Tradable> getObservableList() { public ObservableList<Tradable> getObservableList() {
synchronized (closedTradables) { return closedTradables.getObservableList();
return closedTradables.getObservableList();
}
} }
public List<Tradable> getTradableList() { public List<Tradable> getTradableList() {
return ImmutableList.copyOf(new ArrayList<>(getObservableList())); synchronized (closedTradables.getList()) {
return ImmutableList.copyOf(new ArrayList<>(getObservableList()));
}
} }
public List<Trade> getClosedTrades() { public List<Trade> getClosedTrades() {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
return ImmutableList.copyOf(getObservableList().stream() return ImmutableList.copyOf(getObservableList().stream()
.filter(e -> e instanceof Trade) .filter(e -> e instanceof Trade)
.map(e -> (Trade) e) .map(e -> (Trade) e)
@ -136,7 +138,7 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public List<OpenOffer> getCanceledOpenOffers() { public List<OpenOffer> getCanceledOpenOffers() {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
return ImmutableList.copyOf(getObservableList().stream() return ImmutableList.copyOf(getObservableList().stream()
.filter(e -> (e instanceof OpenOffer) && ((OpenOffer) e).getState().equals(CANCELED)) .filter(e -> (e instanceof OpenOffer) && ((OpenOffer) e).getState().equals(CANCELED))
.map(e -> (OpenOffer) e) .map(e -> (OpenOffer) e)
@ -145,19 +147,19 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public Optional<Tradable> getTradableById(String id) { public Optional<Tradable> getTradableById(String id) {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
return closedTradables.stream().filter(e -> e.getId().equals(id)).findFirst(); return closedTradables.stream().filter(e -> e.getId().equals(id)).findFirst();
} }
} }
public Optional<Trade> getTradeById(String id) { public Optional<Trade> getTradeById(String id) {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
return getClosedTrades().stream().filter(e -> e.getId().equals(id)).findFirst(); return getClosedTrades().stream().filter(e -> e.getId().equals(id)).findFirst();
} }
} }
public void maybeClearSensitiveData() { public void maybeClearSensitiveData() {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
log.info("checking closed trades eligibility for having sensitive data cleared"); log.info("checking closed trades eligibility for having sensitive data cleared");
closedTradables.stream() closedTradables.stream()
.filter(e -> e instanceof Trade) .filter(e -> e instanceof Trade)
@ -170,11 +172,11 @@ public class ClosedTradableManager implements PersistedDataHost {
public boolean canTradeHaveSensitiveDataCleared(String tradeId) { public boolean canTradeHaveSensitiveDataCleared(String tradeId) {
Instant safeDate = getSafeDateForSensitiveDataClearing(); Instant safeDate = getSafeDateForSensitiveDataClearing();
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
return closedTradables.stream() return closedTradables.stream()
.filter(e -> e.getId().equals(tradeId)) .filter(e -> e.getId().equals(tradeId))
.filter(e -> e.getDate().toInstant().isBefore(safeDate)) .filter(e -> e.getDate().toInstant().isBefore(safeDate))
.count() > 0; .count() > 0;
} }
} }
@ -205,9 +207,11 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public BigInteger getTotalTradeFee(List<Tradable> tradableList) { public BigInteger getTotalTradeFee(List<Tradable> tradableList) {
return BigInteger.valueOf(tradableList.stream() synchronized (tradableList) {
.mapToLong(tradable -> getTradeFee(tradable).longValueExact()) return BigInteger.valueOf(tradableList.stream()
.sum()); .mapToLong(tradable -> getTradeFee(tradable).longValueExact())
.sum());
}
} }
private BigInteger getTradeFee(Tradable tradable) { private BigInteger getTradeFee(Tradable tradable) {
@ -229,7 +233,7 @@ public class ClosedTradableManager implements PersistedDataHost {
} }
public void removeTrade(Trade trade) { public void removeTrade(Trade trade) {
synchronized (closedTradables) { synchronized (closedTradables.getList()) {
if (closedTradables.remove(trade)) { if (closedTradables.remove(trade)) {
requestPersistence(); requestPersistence();
} }

View file

@ -269,13 +269,15 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
@Override @Override
public void readPersisted(Runnable completeHandler) { public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> { persistenceManager.readPersisted(persisted -> {
tradableList.setAll(persisted.getList()); synchronized (persisted.getList()) {
tradableList.stream() tradableList.setAll(persisted.getList());
.filter(trade -> trade.getOffer() != null) tradableList.stream()
.forEach(trade -> trade.getOffer().setPriceFeedService(priceFeedService)); .filter(trade -> trade.getOffer() != null)
completeHandler.run(); .forEach(trade -> trade.getOffer().setPriceFeedService(priceFeedService));
}, }
completeHandler); completeHandler.run();
},
completeHandler);
} }
@ -992,7 +994,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
log.info("TradeManager.removeTrade() " + trade.getId()); log.info("TradeManager.removeTrade() " + trade.getId());
// remove trade // remove trade
synchronized (tradableList) { synchronized (tradableList.getList()) {
if (!tradableList.remove(trade)) return; if (!tradableList.remove(trade)) return;
} }
@ -1036,18 +1038,20 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
private void updateTradePeriodState() { private void updateTradePeriodState() {
if (isShutDownStarted) return; if (isShutDownStarted) return;
for (Trade trade : new ArrayList<Trade>(tradableList.getList())) { synchronized (tradableList.getList()) {
if (!trade.isPayoutPublished()) { for (Trade trade : tradableList.getList()) {
Date maxTradePeriodDate = trade.getMaxTradePeriodDate(); if (!trade.isPayoutPublished()) {
Date halfTradePeriodDate = trade.getHalfTradePeriodDate(); Date maxTradePeriodDate = trade.getMaxTradePeriodDate();
if (maxTradePeriodDate != null && halfTradePeriodDate != null) { Date halfTradePeriodDate = trade.getHalfTradePeriodDate();
Date now = new Date(); if (maxTradePeriodDate != null && halfTradePeriodDate != null) {
if (now.after(maxTradePeriodDate)) { Date now = new Date();
trade.setPeriodState(Trade.TradePeriodState.TRADE_PERIOD_OVER); if (now.after(maxTradePeriodDate)) {
requestPersistence(); trade.setPeriodState(Trade.TradePeriodState.TRADE_PERIOD_OVER);
} else if (now.after(halfTradePeriodDate)) { requestPersistence();
trade.setPeriodState(Trade.TradePeriodState.SECOND_HALF); } else if (now.after(halfTradePeriodDate)) {
requestPersistence(); trade.setPeriodState(Trade.TradePeriodState.SECOND_HALF);
requestPersistence();
}
} }
} }
} }
@ -1093,7 +1097,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
public Stream<Trade> getTradesStreamWithFundsLockedIn() { public Stream<Trade> getTradesStreamWithFundsLockedIn() {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return getObservableList().stream().filter(Trade::isFundsLockedIn); return getObservableList().stream().filter(Trade::isFundsLockedIn);
} }
} }
@ -1108,7 +1112,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
public Set<String> getSetOfFailedOrClosedTradeIdsFromLockedInFunds() throws TradeTxException { public Set<String> getSetOfFailedOrClosedTradeIdsFromLockedInFunds() throws TradeTxException {
AtomicReference<TradeTxException> tradeTxException = new AtomicReference<>(); AtomicReference<TradeTxException> tradeTxException = new AtomicReference<>();
synchronized (tradableList) { synchronized (tradableList.getList()) {
Set<String> tradesIdSet = getTradesStreamWithFundsLockedIn() Set<String> tradesIdSet = getTradesStreamWithFundsLockedIn()
.filter(Trade::hasFailed) .filter(Trade::hasFailed)
.map(Trade::getId) .map(Trade::getId)
@ -1170,7 +1174,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
initPersistedTrade(trade); initPersistedTrade(trade);
UserThread.execute(() -> { UserThread.execute(() -> {
synchronized (tradableList) { synchronized (tradableList.getList()) {
if (!tradableList.contains(trade)) { if (!tradableList.contains(trade)) {
tradableList.add(trade); tradableList.add(trade);
} }
@ -1241,7 +1245,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
public ObservableList<Trade> getObservableList() { public ObservableList<Trade> getObservableList() {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return tradableList.getObservableList(); return tradableList.getObservableList();
} }
} }
@ -1274,33 +1278,33 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
public Optional<Trade> getOpenTrade(String tradeId) { public Optional<Trade> getOpenTrade(String tradeId) {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return tradableList.stream().filter(e -> e.getId().equals(tradeId)).findFirst(); return tradableList.stream().filter(e -> e.getId().equals(tradeId)).findFirst();
} }
} }
public boolean hasOpenTrade(Trade trade) { public boolean hasOpenTrade(Trade trade) {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return tradableList.contains(trade); return tradableList.contains(trade);
} }
} }
public boolean hasFailedScheduledTrade(String offerId) { public boolean hasFailedScheduledTrade(String offerId) {
synchronized (failedTradesManager) { return failedTradesManager.getTradeById(offerId).isPresent() && failedTradesManager.getTradeById(offerId).get().isProtocolErrorHandlingScheduled();
return failedTradesManager.getTradeById(offerId).isPresent() && failedTradesManager.getTradeById(offerId).get().isProtocolErrorHandlingScheduled();
}
} }
public Optional<Trade> getOpenTradeByUid(String tradeUid) { public Optional<Trade> getOpenTradeByUid(String tradeUid) {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return tradableList.stream().filter(e -> e.getUid().equals(tradeUid)).findFirst(); return tradableList.stream().filter(e -> e.getUid().equals(tradeUid)).findFirst();
} }
} }
public List<Trade> getAllTrades() { public List<Trade> getAllTrades() {
synchronized (tradableList) { synchronized (tradableList.getList()) {
List<Trade> trades = new ArrayList<Trade>(); List<Trade> trades = new ArrayList<Trade>();
trades.addAll(tradableList.getList()); synchronized (tradableList.getList()) {
trades.addAll(tradableList.getList());
}
trades.addAll(closedTradableManager.getClosedTrades()); trades.addAll(closedTradableManager.getClosedTrades());
trades.addAll(failedTradesManager.getObservableList()); trades.addAll(failedTradesManager.getObservableList());
return trades; return trades;
@ -1308,7 +1312,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
public List<Trade> getOpenTrades() { public List<Trade> getOpenTrades() {
synchronized (tradableList) { synchronized (tradableList.getList()) {
return ImmutableList.copyOf(getObservableList().stream() return ImmutableList.copyOf(getObservableList().stream()
.filter(e -> e instanceof Trade) .filter(e -> e instanceof Trade)
.map(e -> e) .map(e -> e)
@ -1329,7 +1333,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
} }
private void addTrade(Trade trade) { private void addTrade(Trade trade) {
synchronized (tradableList) { synchronized (tradableList.getList()) {
if (tradableList.add(trade)) { if (tradableList.add(trade)) {
requestPersistence(); requestPersistence();
} }

View file

@ -70,13 +70,15 @@ public class FailedTradesManager implements PersistedDataHost {
@Override @Override
public void readPersisted(Runnable completeHandler) { public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> { persistenceManager.readPersisted(persisted -> {
failedTrades.setAll(persisted.getList()); synchronized (persisted.getList()) {
failedTrades.stream() failedTrades.setAll(persisted.getList());
.filter(trade -> trade.getOffer() != null) failedTrades.stream()
.forEach(trade -> trade.getOffer().setPriceFeedService(priceFeedService)); .filter(trade -> trade.getOffer() != null)
completeHandler.run(); .forEach(trade -> trade.getOffer().setPriceFeedService(priceFeedService));
}, }
completeHandler); completeHandler.run();
},
completeHandler);
} }
public void onAllServicesInitialized() { public void onAllServicesInitialized() {
@ -84,7 +86,7 @@ public class FailedTradesManager implements PersistedDataHost {
} }
public void add(Trade trade) { public void add(Trade trade) {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
if (failedTrades.add(trade)) { if (failedTrades.add(trade)) {
requestPersistence(); requestPersistence();
} }
@ -92,7 +94,7 @@ public class FailedTradesManager implements PersistedDataHost {
} }
public void removeTrade(Trade trade) { public void removeTrade(Trade trade) {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
if (failedTrades.remove(trade)) { if (failedTrades.remove(trade)) {
requestPersistence(); requestPersistence();
} }
@ -104,26 +106,26 @@ public class FailedTradesManager implements PersistedDataHost {
} }
public ObservableList<Trade> getObservableList() { public ObservableList<Trade> getObservableList() {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
return failedTrades.getObservableList(); return failedTrades.getObservableList();
} }
} }
public Optional<Trade> getTradeById(String id) { public Optional<Trade> getTradeById(String id) {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
return failedTrades.stream().filter(e -> e.getId().equals(id)).findFirst(); return failedTrades.stream().filter(e -> e.getId().equals(id)).findFirst();
} }
} }
public Stream<Trade> getTradesStreamWithFundsLockedIn() { public Stream<Trade> getTradesStreamWithFundsLockedIn() {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
return failedTrades.stream() return failedTrades.stream()
.filter(Trade::isFundsLockedIn); .filter(Trade::isFundsLockedIn);
} }
} }
public void unFailTrade(Trade trade) { public void unFailTrade(Trade trade) {
synchronized (failedTrades) { synchronized (failedTrades.getList()) {
if (unFailTradeCallback == null) if (unFailTradeCallback == null)
return; return;

View file

@ -62,7 +62,9 @@ class SignedOffersDataModel extends ActivatableDataModel {
private void applyList() { private void applyList() {
list.clear(); list.clear();
list.addAll(openOfferManager.getObservableSignedOffersList().stream().map(SignedOfferListItem::new).collect(Collectors.toList())); synchronized (openOfferManager.getObservableSignedOffersList()) {
list.addAll(openOfferManager.getObservableSignedOffersList().stream().map(SignedOfferListItem::new).collect(Collectors.toList()));
}
// we sort by date, the earliest first // we sort by date, the earliest first
list.sort((o1, o2) -> new Date(o2.getSignedOffer().getTimeStamp()).compareTo(new Date(o1.getSignedOffer().getTimeStamp()))); list.sort((o1, o2) -> new Date(o2.getSignedOffer().getTimeStamp()).compareTo(new Date(o1.getSignedOffer().getTimeStamp())));

View file

@ -206,15 +206,17 @@ public class GUIUtil {
persistenceManager.readPersisted(fileName, persisted -> { persistenceManager.readPersisted(fileName, persisted -> {
StringBuilder msg = new StringBuilder(); StringBuilder msg = new StringBuilder();
HashSet<PaymentAccount> paymentAccounts = new HashSet<>(); HashSet<PaymentAccount> paymentAccounts = new HashSet<>();
persisted.getList().forEach(paymentAccount -> { synchronized (persisted.getList()) {
String id = paymentAccount.getId(); persisted.getList().forEach(paymentAccount -> {
if (user.getPaymentAccount(id) == null) { String id = paymentAccount.getId();
paymentAccounts.add(paymentAccount); if (user.getPaymentAccount(id) == null) {
msg.append(Res.get("guiUtil.accountExport.tradingAccount", id)); paymentAccounts.add(paymentAccount);
} else { msg.append(Res.get("guiUtil.accountExport.tradingAccount", id));
msg.append(Res.get("guiUtil.accountImport.noImport", id)); } else {
} msg.append(Res.get("guiUtil.accountImport.noImport", id));
}); }
});
}
user.addImportedPaymentAccounts(paymentAccounts); user.addImportedPaymentAccounts(paymentAccounts);
new Popup().feedback(Res.get("guiUtil.accountImport.imported", path, msg)).show(); new Popup().feedback(Res.get("guiUtil.accountImport.imported", path, msg)).show();
}, },

View file

@ -48,12 +48,14 @@ public class MailboxMessageList extends PersistableList<MailboxItem> {
@Override @Override
public Message toProtoMessage() { public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder() synchronized (getList()) {
.setMailboxMessageList(protobuf.MailboxMessageList.newBuilder() return protobuf.PersistableEnvelope.newBuilder()
.addAllMailboxItem(getList().stream() .setMailboxMessageList(protobuf.MailboxMessageList.newBuilder()
.map(MailboxItem::toProtoMessage) .addAllMailboxItem(getList().stream()
.collect(Collectors.toList()))) .map(MailboxItem::toProtoMessage)
.build(); .collect(Collectors.toList())))
.build();
}
} }
public static MailboxMessageList fromProto(protobuf.MailboxMessageList proto, public static MailboxMessageList fromProto(protobuf.MailboxMessageList proto,