remove mempool service

This commit is contained in:
woodser 2023-07-12 07:05:55 -04:00
parent 0843f27b63
commit a1829ee9f3
10 changed files with 3 additions and 597 deletions

View file

@ -34,7 +34,6 @@ import haveno.core.offer.OpenOfferManager;
import haveno.core.offer.TriggerPriceService;
import haveno.core.payment.AmazonGiftCardAccount;
import haveno.core.payment.RevolutAccount;
import haveno.core.provider.mempool.MempoolService;
import haveno.core.provider.price.PriceFeedService;
import haveno.core.support.dispute.arbitration.ArbitrationManager;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
@ -93,7 +92,6 @@ public class DomainInitialisation {
private final MarketAlerts marketAlerts;
private final User user;
private final TriggerPriceService triggerPriceService;
private final MempoolService mempoolService;
private final MailboxMessageService mailboxMessageService;
@Inject
@ -126,7 +124,6 @@ public class DomainInitialisation {
MarketAlerts marketAlerts,
User user,
TriggerPriceService triggerPriceService,
MempoolService mempoolService,
MailboxMessageService mailboxMessageService) {
this.clockWatcher = clockWatcher;
this.arbitrationManager = arbitrationManager;
@ -157,7 +154,6 @@ public class DomainInitialisation {
this.marketAlerts = marketAlerts;
this.user = user;
this.triggerPriceService = triggerPriceService;
this.mempoolService = mempoolService;
this.mailboxMessageService = mailboxMessageService;
}
@ -215,7 +211,6 @@ public class DomainInitialisation {
priceAlert.onAllServicesInitialized();
marketAlerts.onAllServicesInitialized();
triggerPriceService.onAllServicesInitialized();
mempoolService.onAllServicesInitialized();
mailboxMessageService.onAllServicesInitialized();

View file

@ -22,7 +22,6 @@ import haveno.core.locale.CurrencyUtil;
import haveno.core.monetary.CryptoMoney;
import haveno.core.monetary.Price;
import haveno.core.monetary.TraditionalMoney;
import haveno.core.provider.mempool.MempoolService;
import haveno.core.provider.price.MarketPrice;
import haveno.core.provider.price.PriceFeedService;
import haveno.network.p2p.BootstrapListener;
@ -47,18 +46,15 @@ import static haveno.common.util.MathUtils.scaleUpByPowerOf10;
public class TriggerPriceService {
private final P2PService p2PService;
private final OpenOfferManager openOfferManager;
private final MempoolService mempoolService;
private final PriceFeedService priceFeedService;
private final Map<String, Set<OpenOffer>> openOffersByCurrency = new HashMap<>();
@Inject
public TriggerPriceService(P2PService p2PService,
OpenOfferManager openOfferManager,
MempoolService mempoolService,
PriceFeedService priceFeedService) {
this.p2PService = p2PService;
this.openOfferManager = openOfferManager;
this.mempoolService = mempoolService;
this.priceFeedService = priceFeedService;
}
@ -152,19 +148,7 @@ public class TriggerPriceService {
}, errorMessage -> {
});
} else if (openOffer.getState() == OpenOffer.State.AVAILABLE) {
// check the mempool if it has not been done before
if (openOffer.getMempoolStatus() < 0 && mempoolService.canRequestBeMade(openOffer.getOffer().getOfferPayload())) {
mempoolService.validateOfferMakerTx(openOffer.getOffer().getOfferPayload(), (txValidator -> {
openOffer.setMempoolStatus(txValidator.isFail() ? 0 : 1);
}));
}
// if the mempool indicated failure then deactivate the open offer
if (openOffer.getMempoolStatus() == 0) {
log.info("Deactivating open offer {} due to mempool validation", openOffer.getOffer().getShortId());
openOfferManager.deactivateOpenOffer(openOffer, () -> {
}, errorMessage -> {
});
}
// TODO: check if open offer's reserve tx is failed or double spend seen
}
}

View file

@ -1,81 +0,0 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.provider.mempool;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import haveno.common.util.Utilities;
import haveno.core.provider.MempoolHttpClient;
import haveno.core.user.Preferences;
import haveno.network.Socks5ProxyProvider;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class MempoolRequest {
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("MempoolRequest", 3, 5, 10 * 60);
private final List<String> txBroadcastServices = new ArrayList<>();
private final MempoolHttpClient mempoolHttpClient;
public MempoolRequest(Preferences preferences, Socks5ProxyProvider socks5ProxyProvider) {
this.txBroadcastServices.addAll(preferences.getDefaultTxBroadcastServices());
this.mempoolHttpClient = new MempoolHttpClient(socks5ProxyProvider);
}
public void getTxStatus(SettableFuture<String> mempoolServiceCallback, String txId) {
mempoolHttpClient.setBaseUrl(getRandomServiceAddress(txBroadcastServices));
ListenableFuture<String> future = executorService.submit(() -> {
Thread.currentThread().setName("MempoolRequest @ " + mempoolHttpClient.getBaseUrl());
log.info("Making http request for information on txId: {}", txId);
return mempoolHttpClient.getTxDetails(txId);
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(String mempoolData) {
log.info("Received mempoolData of [{}] from provider", mempoolData);
mempoolServiceCallback.set(mempoolData);
}
public void onFailure(@NotNull Throwable throwable) {
mempoolServiceCallback.setException(throwable);
}
}, MoreExecutors.directExecutor());
}
public boolean switchToAnotherProvider() {
txBroadcastServices.remove(mempoolHttpClient.getBaseUrl());
return txBroadcastServices.size() > 0;
}
@Nullable
private static String getRandomServiceAddress(List<String> txBroadcastServices) {
List<String> list = checkNotNull(txBroadcastServices);
return !list.isEmpty() ? list.get(new Random().nextInt(list.size())) : null;
}
}

View file

@ -1,257 +0,0 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.provider.mempool;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.core.filter.FilterManager;
import haveno.core.offer.OfferPayload;
import haveno.core.trade.Trade;
import haveno.core.user.Preferences;
import haveno.network.Socks5ProxyProvider;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.core.Coin;
import javax.annotation.Nullable;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
@Slf4j
@Singleton
public class MempoolService {
private final Socks5ProxyProvider socks5ProxyProvider;
private final Config config;
private final Preferences preferences;
private final FilterManager filterManager;
private final List<String> btcFeeReceivers = new ArrayList<>();
@Getter
private int outstandingRequests = 0;
@Inject
public MempoolService(Socks5ProxyProvider socks5ProxyProvider,
Config config,
Preferences preferences,
FilterManager filterManager) {
this.socks5ProxyProvider = socks5ProxyProvider;
this.config = config;
this.preferences = preferences;
this.filterManager = filterManager;
}
public void onAllServicesInitialized() {
btcFeeReceivers.addAll(getAllBtcFeeReceivers());
}
public boolean canRequestBeMade() {
return outstandingRequests < 5; // limit max simultaneous lookups
}
public boolean canRequestBeMade(OfferPayload offerPayload) {
// when validating a new offer, wait 1 block for the tx to propagate
return canRequestBeMade();
}
public void validateOfferMakerTx(OfferPayload offerPayload, Consumer<TxValidator> resultHandler) {
validateOfferMakerTx(new TxValidator( offerPayload.getOfferFeeTxId(), Coin.valueOf(offerPayload.getAmount())), resultHandler);
}
public void validateOfferMakerTx(TxValidator txValidator, Consumer<TxValidator> resultHandler) {
if (!isServiceSupported()) {
UserThread.runAfter(() -> resultHandler.accept(txValidator.endResult("mempool request not supported, bypassing", true)), 1);
return;
}
MempoolRequest mempoolRequest = new MempoolRequest(preferences, socks5ProxyProvider);
validateOfferMakerTx(mempoolRequest, txValidator, resultHandler);
}
public void validateOfferTakerTx(Trade trade, Consumer<TxValidator> resultHandler) {
throw new RuntimeException("MempoolService.validateOfferTakerTx needs updated for XMR");
//validateOfferTakerTx(new TxValidator( trade.getTakerFeeTxId(), trade.getTradeAmount(),resultHandler));
}
public void validateOfferTakerTx(TxValidator txValidator, Consumer<TxValidator> resultHandler) {
if (!isServiceSupported()) {
UserThread.runAfter(() -> resultHandler.accept(txValidator.endResult("mempool request not supported, bypassing", true)), 1);
return;
}
MempoolRequest mempoolRequest = new MempoolRequest(preferences, socks5ProxyProvider);
validateOfferTakerTx(mempoolRequest, txValidator, resultHandler);
}
public void checkTxIsConfirmed(String txId, Consumer<TxValidator> resultHandler) {
TxValidator txValidator = new TxValidator(txId);
if (!isServiceSupported()) {
UserThread.runAfter(() -> resultHandler.accept(txValidator.endResult("mempool request not supported, bypassing", true)), 1);
return;
}
MempoolRequest mempoolRequest = new MempoolRequest(preferences, socks5ProxyProvider);
SettableFuture<String> future = SettableFuture.create();
Futures.addCallback(future, callbackForTxRequest(mempoolRequest, txValidator, resultHandler), MoreExecutors.directExecutor());
mempoolRequest.getTxStatus(future, txId);
}
// ///////////////////////////
private void validateOfferMakerTx(MempoolRequest mempoolRequest,
TxValidator txValidator,
Consumer<TxValidator> resultHandler) {
SettableFuture<String> future = SettableFuture.create();
Futures.addCallback(future, callbackForMakerTxValidation(mempoolRequest, txValidator, resultHandler), MoreExecutors.directExecutor());
mempoolRequest.getTxStatus(future, txValidator.getTxId());
}
private void validateOfferTakerTx(MempoolRequest mempoolRequest,
TxValidator txValidator,
Consumer<TxValidator> resultHandler) {
SettableFuture<String> future = SettableFuture.create();
Futures.addCallback(future, callbackForTakerTxValidation(mempoolRequest, txValidator, resultHandler), MoreExecutors.directExecutor());
mempoolRequest.getTxStatus(future, txValidator.getTxId());
}
private FutureCallback<String> callbackForMakerTxValidation(MempoolRequest theRequest,
TxValidator txValidator,
Consumer<TxValidator> resultHandler) {
outstandingRequests++;
FutureCallback<String> myCallback = new FutureCallback<>() {
@Override
public void onSuccess(@Nullable String jsonTxt) {
UserThread.execute(() -> {
outstandingRequests--;
resultHandler.accept(txValidator.endResult("onSuccess", true));
});
}
@Override
public void onFailure(Throwable throwable) {
log.warn("onFailure - {}", throwable.toString());
UserThread.execute(() -> {
outstandingRequests--;
if (theRequest.switchToAnotherProvider()) {
validateOfferMakerTx(theRequest, txValidator, resultHandler);
} else {
// exhausted all providers, let user know of failure
resultHandler.accept(txValidator.endResult("Tx not found", false));
}
});
}
};
return myCallback;
}
private FutureCallback<String> callbackForTakerTxValidation(MempoolRequest theRequest,
TxValidator txValidator,
Consumer<TxValidator> resultHandler) {
outstandingRequests++;
FutureCallback<String> myCallback = new FutureCallback<>() {
@Override
public void onSuccess(@Nullable String jsonTxt) {
UserThread.execute(() -> {
outstandingRequests--;
resultHandler.accept(txValidator.endResult("onSuccess", true));
});
}
@Override
public void onFailure(Throwable throwable) {
log.warn("onFailure - {}", throwable.toString());
UserThread.execute(() -> {
outstandingRequests--;
if (theRequest.switchToAnotherProvider()) {
validateOfferTakerTx(theRequest, txValidator, resultHandler);
} else {
// exhausted all providers, let user know of failure
resultHandler.accept(txValidator.endResult("Tx not found", false));
}
});
}
};
return myCallback;
}
private FutureCallback<String> callbackForTxRequest(MempoolRequest theRequest,
TxValidator txValidator,
Consumer<TxValidator> resultHandler) {
outstandingRequests++;
FutureCallback<String> myCallback = new FutureCallback<>() {
@Override
public void onSuccess(@Nullable String jsonTxt) {
UserThread.execute(() -> {
outstandingRequests--;
txValidator.setJsonTxt(jsonTxt);
resultHandler.accept(txValidator);
});
}
@Override
public void onFailure(Throwable throwable) {
log.warn("onFailure - {}", throwable.toString());
UserThread.execute(() -> {
outstandingRequests--;
resultHandler.accept(txValidator.endResult("Tx not found", false));
});
}
};
return myCallback;
}
// /////////////////////////////
private List<String> getAllBtcFeeReceivers() {
List<String> btcFeeReceivers = new ArrayList<>();
// fee receivers from filter ref: bisq-network/bisq/pull/4294
List<String> feeReceivers = Optional.ofNullable(filterManager.getFilter())
.flatMap(f -> Optional.ofNullable(f.getBtcFeeReceiverAddresses()))
.orElse(List.of());
feeReceivers.forEach(e -> {
try {
btcFeeReceivers.add(e.split("#")[0]); // victim's receiver address
} catch (RuntimeException ignore) {
// If input format is not as expected we ignore entry
}
});
log.info("Known BTC fee receivers: {}", btcFeeReceivers.toString());
return btcFeeReceivers;
}
private boolean isServiceSupported() {
if (filterManager.getFilter() != null && filterManager.getFilter().isDisableMempoolValidation()) {
log.info("MempoolService bypassed by filter setting disableMempoolValidation=true");
return false;
}
if (config.bypassMempoolValidation) {
log.info("MempoolService bypassed by config setting bypassMempoolValidation=true");
return false;
}
if (!Config.baseCurrencyNetwork().isMainnet()) {
log.info("MempoolService only supports mainnet");
return false;
}
return true;
}
}

View file

@ -1,76 +0,0 @@
/*
* This file is part of Haveno.
*
* Haveno is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Haveno is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Haveno. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.provider.mempool;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.core.Coin;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Getter
public class TxValidator {
private final List<String> errorList;
private final String txId;
private Coin amount;
@Setter
private String jsonTxt;
public TxValidator(String txId, Coin amount) {
this.txId = txId;
this.amount = amount;
this.errorList = new ArrayList<>();
this.jsonTxt = "";
}
public TxValidator(String txId) {
this.txId = txId;
this.errorList = new ArrayList<>();
this.jsonTxt = "";
}
public TxValidator endResult(String title, boolean status) {
log.info("{} : {}", title, status ? "SUCCESS" : "FAIL");
if (!status) {
errorList.add(title);
}
return this;
}
public boolean isFail() {
return errorList.size() > 0;
}
public boolean getResult() {
return errorList.size() == 0;
}
public String errorSummary() {
return errorList.toString().substring(0, Math.min(85, errorList.toString().length()));
}
@Override
public String toString() {
return errorList.toString();
}
}