This commit is contained in:
woodser 2021-05-04 20:20:01 -04:00
commit 8a38081c04
2800 changed files with 344130 additions and 0 deletions

View file

@ -0,0 +1,23 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.app;
import bisq.core.app.BisqHeadlessApp;
public class BisqDaemon extends BisqHeadlessApp {
}

View file

@ -0,0 +1,115 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.app;
import bisq.core.app.BisqHeadlessAppMain;
import bisq.core.app.BisqSetup;
import bisq.core.app.CoreModule;
import bisq.common.UserThread;
import bisq.common.app.AppModule;
import bisq.common.handlers.ResultHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
import bisq.daemon.grpc.GrpcServer;
@Slf4j
public class BisqDaemonMain extends BisqHeadlessAppMain implements BisqSetup.BisqSetupListener {
private GrpcServer grpcServer;
public static void main(String[] args) {
new BisqDaemonMain().execute(args);
}
/////////////////////////////////////////////////////////////////////////////////////
// First synchronous execution tasks
/////////////////////////////////////////////////////////////////////////////////////
@Override
protected void configUserThread() {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(this.getClass().getSimpleName())
.setDaemon(true)
.build();
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));
}
@Override
protected void launchApplication() {
headlessApp = new BisqDaemon();
UserThread.execute(this::onApplicationLaunched);
}
@Override
protected void onApplicationLaunched() {
super.onApplicationLaunched();
headlessApp.setGracefulShutDownHandler(this);
}
/////////////////////////////////////////////////////////////////////////////////////
// We continue with a series of synchronous execution tasks
/////////////////////////////////////////////////////////////////////////////////////
@Override
protected AppModule getModule() {
return new CoreModule(config);
}
@Override
protected void applyInjector() {
super.applyInjector();
headlessApp.setInjector(injector);
}
@Override
protected void startApplication() {
// We need to be in user thread! We mapped at launchApplication already...
headlessApp.startApplication();
// In headless mode we don't have an async behaviour so we trigger the setup by
// calling onApplicationStarted.
onApplicationStarted();
}
@Override
protected void onApplicationStarted() {
super.onApplicationStarted();
grpcServer = injector.getInstance(GrpcServer.class);
grpcServer.start();
}
@Override
public void gracefulShutDown(ResultHandler resultHandler) {
super.gracefulShutDown(resultHandler);
grpcServer.shutdown();
}
}

View file

@ -0,0 +1,69 @@
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.proto.grpc.RegisterDisputeAgentReply;
import bisq.proto.grpc.RegisterDisputeAgentRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.DisputeAgentsGrpc.DisputeAgentsImplBase;
import static bisq.proto.grpc.DisputeAgentsGrpc.getRegisterDisputeAgentMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcDisputeAgentsService extends DisputeAgentsImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcDisputeAgentsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void registerDisputeAgent(RegisterDisputeAgentRequest req,
StreamObserver<RegisterDisputeAgentReply> responseObserver) {
try {
coreApi.registerDisputeAgent(req.getDisputeAgentType(), req.getRegistrationKey());
var reply = RegisterDisputeAgentReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
// Do not limit devs' ability to test agent registration
// and call validation in regtest arbitration daemons.
put(getRegisterDisputeAgentMethod().getFullMethodName(), new GrpcCallRateMeter(10, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,143 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.common.handlers.ErrorMessageHandler;
import bisq.proto.grpc.AvailabilityResultWithDescription;
import bisq.proto.grpc.TakeOfferReply;
import protobuf.AvailabilityResult;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import lombok.Getter;
import static bisq.proto.grpc.TradesGrpc.getTakeOfferMethod;
import static java.lang.String.format;
import static java.util.Arrays.stream;
/**
* An implementation of bisq.common.handlers.ErrorMessageHandler that avoids
* an exception loop with the UI's bisq.common.taskrunner framework.
*
* The legacy ErrorMessageHandler is for reporting error messages only to the UI, but
* some core api tasks (takeoffer) require one. This implementation works around
* the problem of Task ErrorMessageHandlers not throwing exceptions to the gRPC client.
*
* Extra care is needed because exceptions thrown by an ErrorMessageHandler inside
* a Task may be thrown back to the GrpcService object, and if a gRPC ErrorMessageHandler
* responded by throwing another exception, the loop may only stop after the gRPC
* stream is closed.
*
* A unique instance should be used for a single gRPC call.
*/
public class GrpcErrorMessageHandler implements ErrorMessageHandler {
@Getter
private boolean isErrorHandled = false;
private final String fullMethodName;
private final StreamObserver<?> responseObserver;
private final GrpcExceptionHandler exceptionHandler;
private final Logger log;
public GrpcErrorMessageHandler(String fullMethodName,
StreamObserver<?> responseObserver,
GrpcExceptionHandler exceptionHandler,
Logger log) {
this.fullMethodName = fullMethodName;
this.exceptionHandler = exceptionHandler;
this.responseObserver = responseObserver;
this.log = log;
}
@Override
public void handleErrorMessage(String errorMessage) {
// A task runner may call handleErrorMessage(String) more than once.
// Throw only one exception if that happens, to avoid looping until the
// grpc stream is closed
if (!isErrorHandled) {
this.isErrorHandled = true;
log.error(errorMessage);
if (takeOfferWasCalled()) {
handleTakeOfferError(errorMessage);
} else {
exceptionHandler.handleErrorMessage(log,
errorMessage,
responseObserver);
}
}
}
private void handleTakeOfferError(String errorMessage) {
// If the errorMessage originated from a UI purposed TaskRunner, it should
// contain an AvailabilityResult enum name. If it does, derive the
// AvailabilityResult enum from the errorMessage, wrap it in a new
// AvailabilityResultWithDescription enum, then send the
// AvailabilityResultWithDescription to the client instead of throwing
// an exception. The client should use the grpc reply object's
// AvailabilityResultWithDescription field if reply.hasTrade = false, and the
// client can decide to throw an exception with the client friendly error
// description, or take some other action based on the AvailabilityResult enum.
// (Some offer availability problems are not fatal, and retries are appropriate.)
try {
var failureReason = getAvailabilityResultWithDescription(errorMessage);
var reply = TakeOfferReply.newBuilder()
.setFailureReason(failureReason)
.build();
@SuppressWarnings("unchecked")
var takeOfferResponseObserver = (StreamObserver<TakeOfferReply>) responseObserver;
takeOfferResponseObserver.onNext(reply);
takeOfferResponseObserver.onCompleted();
} catch (IllegalArgumentException ex) {
log.error("", ex);
exceptionHandler.handleErrorMessage(log,
errorMessage,
responseObserver);
}
}
private AvailabilityResultWithDescription getAvailabilityResultWithDescription(String errorMessage) {
AvailabilityResult proto = getAvailabilityResult(errorMessage);
String description = getAvailabilityResultDescription(proto);
return AvailabilityResultWithDescription.newBuilder()
.setAvailabilityResult(proto)
.setDescription(description)
.build();
}
private AvailabilityResult getAvailabilityResult(String errorMessage) {
return stream(AvailabilityResult.values())
.filter((e) -> errorMessage.toUpperCase().contains(e.name()))
.findFirst().orElseThrow(() ->
new IllegalArgumentException(
format("Could not find an AvailabilityResult in error message:%n%s", errorMessage)));
}
private String getAvailabilityResultDescription(AvailabilityResult proto) {
return bisq.core.offer.AvailabilityResult.fromProto(proto).description();
}
private boolean takeOfferWasCalled() {
return fullMethodName.equals(getTakeOfferMethod().getFullMethodName());
}
}

View file

@ -0,0 +1,124 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import static io.grpc.Status.INVALID_ARGUMENT;
import static io.grpc.Status.UNKNOWN;
/**
* The singleton instance of this class handles any expected core api Throwable by
* wrapping its message in a gRPC StatusRuntimeException and sending it to the client.
* An unexpected Throwable's message will be replaced with an 'unexpected' error message.
*/
@Singleton
class GrpcExceptionHandler {
private final Predicate<Throwable> isExpectedException = (t) ->
t instanceof IllegalStateException || t instanceof IllegalArgumentException;
@Inject
public GrpcExceptionHandler() {
}
public void handleException(Logger log,
Throwable t,
StreamObserver<?> responseObserver) {
// Log the core api error (this is last chance to do that), wrap it in a new
// gRPC StatusRuntimeException, then send it to the client in the gRPC response.
log.error("", t);
var grpcStatusRuntimeException = wrapException(t);
responseObserver.onError(grpcStatusRuntimeException);
throw grpcStatusRuntimeException;
}
public void handleExceptionAsWarning(Logger log,
String calledMethod,
Throwable t,
StreamObserver<?> responseObserver) {
// Just log a warning instead of an error with full stack trace.
log.warn(calledMethod + " -> " + t.getMessage());
var grpcStatusRuntimeException = wrapException(t);
responseObserver.onError(grpcStatusRuntimeException);
throw grpcStatusRuntimeException;
}
public void handleErrorMessage(Logger log,
String errorMessage,
StreamObserver<?> responseObserver) {
// This is used to wrap Task errors from the ErrorMessageHandler
// interface, an interface that is not allowed to throw exceptions.
log.error(errorMessage);
var grpcStatusRuntimeException = new StatusRuntimeException(
UNKNOWN.withDescription(cliStyleErrorMessage.apply(errorMessage)));
responseObserver.onError(grpcStatusRuntimeException);
throw grpcStatusRuntimeException;
}
private StatusRuntimeException wrapException(Throwable t) {
// We want to be careful about what kinds of exception messages we send to the
// client. Expected core exceptions should be wrapped in an IllegalStateException
// or IllegalArgumentException, with a consistently styled and worded error
// message. But only a small number of the expected error types are currently
// handled this way; there is much work to do to handle the variety of errors
// that can occur in the api. In the meantime, we take care to not pass full,
// unexpected error messages to the client. If the exception type is unexpected,
// we omit details from the gRPC exception sent to the client.
if (isExpectedException.test(t)) {
if (t.getCause() != null)
return new StatusRuntimeException(mapGrpcErrorStatus(t.getCause(), t.getCause().getMessage()));
else
return new StatusRuntimeException(mapGrpcErrorStatus(t, t.getMessage()));
} else {
return new StatusRuntimeException(mapGrpcErrorStatus(t, "unexpected error on server"));
}
}
private final Function<String, String> cliStyleErrorMessage = (e) -> {
String[] line = e.split("\\r?\\n");
int lastLine = line.length;
return line[lastLine - 1].toLowerCase();
};
private Status mapGrpcErrorStatus(Throwable t, String description) {
// We default to the UNKNOWN status, except were the mapping of a core api
// exception to a gRPC Status is obvious. If we ever use a gRPC reverse-proxy
// to support RESTful clients, we will need to have more specific mappings
// to support correct HTTP 1.1. status codes.
//noinspection SwitchStatementWithTooFewBranches
switch (t.getClass().getSimpleName()) {
// We go ahead and use a switch statement instead of if, in anticipation
// of more, specific exception mappings.
case "IllegalArgumentException":
return INVALID_ARGUMENT.withDescription(description);
default:
return UNKNOWN.withDescription(description);
}
}
}

View file

@ -0,0 +1,72 @@
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.trade.statistics.TradeStatistics3;
import bisq.proto.grpc.GetTradeStatisticsReply;
import bisq.proto.grpc.GetTradeStatisticsRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.GetTradeStatisticsGrpc.GetTradeStatisticsImplBase;
import static bisq.proto.grpc.GetTradeStatisticsGrpc.getGetTradeStatisticsMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcGetTradeStatisticsService extends GetTradeStatisticsImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcGetTradeStatisticsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getTradeStatistics(GetTradeStatisticsRequest req,
StreamObserver<GetTradeStatisticsReply> responseObserver) {
try {
var tradeStatistics = coreApi.getTradeStatistics().stream()
.map(TradeStatistics3::toProtoTradeStatistics3)
.collect(Collectors.toList());
var reply = GetTradeStatisticsReply.newBuilder().addAllTradeStatistics(tradeStatistics).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetTradeStatisticsMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,84 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.proto.grpc.GetMethodHelpReply;
import bisq.proto.grpc.GetMethodHelpRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.HelpGrpc.HelpImplBase;
import static bisq.proto.grpc.HelpGrpc.getGetMethodHelpMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcHelpService extends HelpImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcHelpService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getMethodHelp(GetMethodHelpRequest req,
StreamObserver<GetMethodHelpReply> responseObserver) {
try {
String helpText = coreApi.getMethodHelp(req.getMethodName());
var reply = GetMethodHelpReply.newBuilder().setMethodHelp(helpText).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetMethodHelpMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,205 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.api.model.OfferInfo;
import bisq.core.offer.Offer;
import bisq.core.offer.OpenOffer;
import bisq.proto.grpc.CancelOfferReply;
import bisq.proto.grpc.CancelOfferRequest;
import bisq.proto.grpc.CreateOfferReply;
import bisq.proto.grpc.CreateOfferRequest;
import bisq.proto.grpc.GetMyOfferReply;
import bisq.proto.grpc.GetMyOfferRequest;
import bisq.proto.grpc.GetMyOffersReply;
import bisq.proto.grpc.GetMyOffersRequest;
import bisq.proto.grpc.GetOfferReply;
import bisq.proto.grpc.GetOfferRequest;
import bisq.proto.grpc.GetOffersReply;
import bisq.proto.grpc.GetOffersRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import static bisq.core.api.model.OfferInfo.toOfferInfo;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.OffersGrpc.*;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcOffersService extends OffersImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcOffersService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getOffer(GetOfferRequest req,
StreamObserver<GetOfferReply> responseObserver) {
try {
Offer offer = coreApi.getOffer(req.getId());
var reply = GetOfferReply.newBuilder()
.setOffer(toOfferInfo(offer).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getMyOffer(GetMyOfferRequest req,
StreamObserver<GetMyOfferReply> responseObserver) {
try {
Offer offer = coreApi.getMyOffer(req.getId());
OpenOffer openOffer = coreApi.getMyOpenOffer(req.getId());
var reply = GetMyOfferReply.newBuilder()
.setOffer(toOfferInfo(offer, openOffer.getTriggerPrice()).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getOffers(GetOffersRequest req,
StreamObserver<GetOffersReply> responseObserver) {
try {
List<OfferInfo> result = coreApi.getOffers(req.getDirection(), req.getCurrencyCode())
.stream().map(OfferInfo::toOfferInfo)
.collect(Collectors.toList());
var reply = GetOffersReply.newBuilder()
.addAllOffers(result.stream()
.map(OfferInfo::toProtoMessage)
.collect(Collectors.toList()))
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getMyOffers(GetMyOffersRequest req,
StreamObserver<GetMyOffersReply> responseObserver) {
try {
List<OfferInfo> result = coreApi.getMyOffers(req.getDirection(), req.getCurrencyCode())
.stream().map(OfferInfo::toOfferInfo)
.collect(Collectors.toList());
var reply = GetMyOffersReply.newBuilder()
.addAllOffers(result.stream()
.map(OfferInfo::toProtoMessage)
.collect(Collectors.toList()))
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void createOffer(CreateOfferRequest req,
StreamObserver<CreateOfferReply> responseObserver) {
try {
coreApi.createAnPlaceOffer(
req.getCurrencyCode(),
req.getDirection(),
req.getPrice(),
req.getUseMarketBasedPrice(),
req.getMarketPriceMargin(),
req.getAmount(),
req.getMinAmount(),
req.getBuyerSecurityDeposit(),
req.getTriggerPrice(),
req.getPaymentAccountId(),
req.getMakerFeeCurrencyCode(),
offer -> {
// This result handling consumer's accept operation will return
// the new offer to the gRPC client after async placement is done.
OfferInfo offerInfo = toOfferInfo(offer);
CreateOfferReply reply = CreateOfferReply.newBuilder()
.setOffer(offerInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void cancelOffer(CancelOfferRequest req,
StreamObserver<CancelOfferReply> responseObserver) {
try {
coreApi.cancelOffer(req.getId());
var reply = CancelOfferReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetMyOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetOffersMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetMyOffersMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getCreateOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getCancelOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
}}
)));
}
}

View file

@ -0,0 +1,184 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.payment.PaymentAccount;
import bisq.core.payment.payload.PaymentMethod;
import bisq.proto.grpc.CreateCryptoCurrencyPaymentAccountReply;
import bisq.proto.grpc.CreateCryptoCurrencyPaymentAccountRequest;
import bisq.proto.grpc.CreatePaymentAccountReply;
import bisq.proto.grpc.CreatePaymentAccountRequest;
import bisq.proto.grpc.GetCryptoCurrencyPaymentMethodsReply;
import bisq.proto.grpc.GetCryptoCurrencyPaymentMethodsRequest;
import bisq.proto.grpc.GetPaymentAccountFormReply;
import bisq.proto.grpc.GetPaymentAccountFormRequest;
import bisq.proto.grpc.GetPaymentAccountsReply;
import bisq.proto.grpc.GetPaymentAccountsRequest;
import bisq.proto.grpc.GetPaymentMethodsReply;
import bisq.proto.grpc.GetPaymentMethodsRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.PaymentAccountsGrpc.*;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcPaymentAccountsService extends PaymentAccountsImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcPaymentAccountsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void createPaymentAccount(CreatePaymentAccountRequest req,
StreamObserver<CreatePaymentAccountReply> responseObserver) {
try {
PaymentAccount paymentAccount = coreApi.createPaymentAccount(req.getPaymentAccountForm());
var reply = CreatePaymentAccountReply.newBuilder()
.setPaymentAccount(paymentAccount.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getPaymentAccounts(GetPaymentAccountsRequest req,
StreamObserver<GetPaymentAccountsReply> responseObserver) {
try {
var paymentAccounts = coreApi.getPaymentAccounts().stream()
.map(PaymentAccount::toProtoMessage)
.collect(Collectors.toList());
var reply = GetPaymentAccountsReply.newBuilder()
.addAllPaymentAccounts(paymentAccounts).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getPaymentMethods(GetPaymentMethodsRequest req,
StreamObserver<GetPaymentMethodsReply> responseObserver) {
try {
var paymentMethods = coreApi.getFiatPaymentMethods().stream()
.map(PaymentMethod::toProtoMessage)
.collect(Collectors.toList());
var reply = GetPaymentMethodsReply.newBuilder()
.addAllPaymentMethods(paymentMethods).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getPaymentAccountForm(GetPaymentAccountFormRequest req,
StreamObserver<GetPaymentAccountFormReply> responseObserver) {
try {
var paymentAccountFormJson = coreApi.getPaymentAccountForm(req.getPaymentMethodId());
var reply = GetPaymentAccountFormReply.newBuilder()
.setPaymentAccountFormJson(paymentAccountFormJson)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void createCryptoCurrencyPaymentAccount(CreateCryptoCurrencyPaymentAccountRequest req,
StreamObserver<CreateCryptoCurrencyPaymentAccountReply> responseObserver) {
try {
PaymentAccount paymentAccount = coreApi.createCryptoCurrencyPaymentAccount(req.getAccountName(),
req.getCurrencyCode(),
req.getAddress(),
req.getTradeInstant());
var reply = CreateCryptoCurrencyPaymentAccountReply.newBuilder()
.setPaymentAccount(paymentAccount.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getCryptoCurrencyPaymentMethods(GetCryptoCurrencyPaymentMethodsRequest req,
StreamObserver<GetCryptoCurrencyPaymentMethodsReply> responseObserver) {
try {
var paymentMethods = coreApi.getCryptoCurrencyPaymentMethods().stream()
.map(PaymentMethod::toProtoMessage)
.collect(Collectors.toList());
var reply = GetCryptoCurrencyPaymentMethodsReply.newBuilder()
.addAllPaymentMethods(paymentMethods).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getCreatePaymentAccountMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getCreateCryptoCurrencyPaymentAccountMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getGetPaymentAccountsMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetPaymentMethodsMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetPaymentAccountFormMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,86 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.proto.grpc.MarketPriceReply;
import bisq.proto.grpc.MarketPriceRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.PriceGrpc.PriceImplBase;
import static bisq.proto.grpc.PriceGrpc.getGetMarketPriceMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcPriceService extends PriceImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcPriceService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getMarketPrice(MarketPriceRequest req,
StreamObserver<MarketPriceReply> responseObserver) {
try {
coreApi.getMarketPrice(req.getCurrencyCode(),
price -> {
var reply = MarketPriceReply.newBuilder().setPrice(price).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetMarketPriceMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,93 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreContext;
import bisq.common.UserThread;
import bisq.common.config.Config;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.UncheckedIOException;
import lombok.extern.slf4j.Slf4j;
import static io.grpc.ServerInterceptors.interceptForward;
import bisq.daemon.grpc.interceptor.PasswordAuthInterceptor;
@Singleton
@Slf4j
public class GrpcServer {
private final Server server;
@Inject
public GrpcServer(CoreContext coreContext,
Config config,
PasswordAuthInterceptor passwordAuthInterceptor,
GrpcDisputeAgentsService disputeAgentsService,
GrpcHelpService helpService,
GrpcOffersService offersService,
GrpcPaymentAccountsService paymentAccountsService,
GrpcPriceService priceService,
GrpcShutdownService shutdownService,
GrpcVersionService versionService,
GrpcGetTradeStatisticsService tradeStatisticsService,
GrpcTradesService tradesService,
GrpcWalletsService walletsService) {
this.server = ServerBuilder.forPort(config.apiPort)
.executor(UserThread.getExecutor())
.addService(interceptForward(disputeAgentsService, disputeAgentsService.interceptors()))
.addService(interceptForward(helpService, helpService.interceptors()))
.addService(interceptForward(offersService, offersService.interceptors()))
.addService(interceptForward(paymentAccountsService, paymentAccountsService.interceptors()))
.addService(interceptForward(priceService, priceService.interceptors()))
.addService(shutdownService)
.addService(interceptForward(tradeStatisticsService, tradeStatisticsService.interceptors()))
.addService(interceptForward(tradesService, tradesService.interceptors()))
.addService(interceptForward(versionService, versionService.interceptors()))
.addService(interceptForward(walletsService, walletsService.interceptors()))
.intercept(passwordAuthInterceptor)
.build();
coreContext.setApiUser(true);
}
public void start() {
try {
server.start();
log.info("listening on port {}", server.getPort());
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
public void shutdown() {
log.info("Server shutdown started");
server.shutdown();
log.info("Server shutdown complete");
}
}

View file

@ -0,0 +1,59 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.app.BisqHeadlessApp;
import bisq.common.UserThread;
import bisq.proto.grpc.ShutdownServerGrpc;
import bisq.proto.grpc.StopReply;
import bisq.proto.grpc.StopRequest;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@Slf4j
class GrpcShutdownService extends ShutdownServerGrpc.ShutdownServerImplBase {
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcShutdownService(GrpcExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public void stop(StopRequest req,
StreamObserver<StopReply> responseObserver) {
try {
log.info("Shutdown request received.");
var reply = StopReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
UserThread.runAfter(BisqHeadlessApp.getShutDownHandler(), 500, MILLISECONDS);
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
}

View file

@ -0,0 +1,186 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.api.model.TradeInfo;
import bisq.core.trade.Trade;
import bisq.proto.grpc.ConfirmPaymentReceivedReply;
import bisq.proto.grpc.ConfirmPaymentReceivedRequest;
import bisq.proto.grpc.ConfirmPaymentStartedReply;
import bisq.proto.grpc.ConfirmPaymentStartedRequest;
import bisq.proto.grpc.GetTradeReply;
import bisq.proto.grpc.GetTradeRequest;
import bisq.proto.grpc.KeepFundsReply;
import bisq.proto.grpc.KeepFundsRequest;
import bisq.proto.grpc.TakeOfferReply;
import bisq.proto.grpc.TakeOfferRequest;
import bisq.proto.grpc.WithdrawFundsReply;
import bisq.proto.grpc.WithdrawFundsRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static bisq.core.api.model.TradeInfo.toTradeInfo;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.TradesGrpc.*;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcTradesService extends TradesImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcTradesService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getTrade(GetTradeRequest req,
StreamObserver<GetTradeReply> responseObserver) {
try {
Trade trade = coreApi.getTrade(req.getTradeId());
String role = coreApi.getTradeRole(req.getTradeId());
var reply = GetTradeReply.newBuilder()
.setTrade(toTradeInfo(trade, role).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (IllegalArgumentException cause) {
// Offer makers may call 'gettrade' many times before a trade exists.
// Log a 'trade not found' warning instead of a full stack trace.
exceptionHandler.handleExceptionAsWarning(log, "getTrade", cause, responseObserver);
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void takeOffer(TakeOfferRequest req,
StreamObserver<TakeOfferReply> responseObserver) {
GrpcErrorMessageHandler errorMessageHandler =
new GrpcErrorMessageHandler(getTakeOfferMethod().getFullMethodName(),
responseObserver,
exceptionHandler,
log);
coreApi.takeOffer(req.getOfferId(),
req.getPaymentAccountId(),
req.getTakerFeeCurrencyCode(),
trade -> {
TradeInfo tradeInfo = toTradeInfo(trade);
var reply = TakeOfferReply.newBuilder()
.setTrade(tradeInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
},
errorMessage -> {
if (!errorMessageHandler.isErrorHandled())
errorMessageHandler.handleErrorMessage(errorMessage);
});
}
@Override
public void confirmPaymentStarted(ConfirmPaymentStartedRequest req,
StreamObserver<ConfirmPaymentStartedReply> responseObserver) {
try {
coreApi.confirmPaymentStarted(req.getTradeId());
var reply = ConfirmPaymentStartedReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void confirmPaymentReceived(ConfirmPaymentReceivedRequest req,
StreamObserver<ConfirmPaymentReceivedReply> responseObserver) {
try {
coreApi.confirmPaymentReceived(req.getTradeId());
var reply = ConfirmPaymentReceivedReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void keepFunds(KeepFundsRequest req,
StreamObserver<KeepFundsReply> responseObserver) {
try {
coreApi.keepFunds(req.getTradeId());
var reply = KeepFundsReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void withdrawFunds(WithdrawFundsRequest req,
StreamObserver<WithdrawFundsReply> responseObserver) {
try {
coreApi.withdrawFunds(req.getTradeId(), req.getAddress(), req.getMemo());
var reply = WithdrawFundsReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetTradeMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getTakeOfferMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getConfirmPaymentStartedMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getConfirmPaymentReceivedMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getKeepFundsMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getWithdrawFundsMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
}}
)));
}
}

View file

@ -0,0 +1,85 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.proto.grpc.GetVersionReply;
import bisq.proto.grpc.GetVersionRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.GetVersionGrpc.GetVersionImplBase;
import static bisq.proto.grpc.GetVersionGrpc.getGetVersionMethod;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@VisibleForTesting
@Slf4j
public class GrpcVersionService extends GetVersionImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcVersionService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getVersion(GetVersionRequest req, StreamObserver<GetVersionReply> responseObserver) {
try {
var reply = GetVersionReply.newBuilder().setVersion(coreApi.getVersion()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetVersionMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,393 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.api.model.AddressBalanceInfo;
import bisq.core.api.model.TxFeeRateInfo;
import bisq.core.btc.exceptions.TxBroadcastException;
import bisq.core.btc.wallet.TxBroadcaster;
import bisq.proto.grpc.GetAddressBalanceReply;
import bisq.proto.grpc.GetAddressBalanceRequest;
import bisq.proto.grpc.GetBalancesReply;
import bisq.proto.grpc.GetBalancesRequest;
import bisq.proto.grpc.GetFundingAddressesReply;
import bisq.proto.grpc.GetFundingAddressesRequest;
import bisq.proto.grpc.GetTransactionReply;
import bisq.proto.grpc.GetTransactionRequest;
import bisq.proto.grpc.GetTxFeeRateReply;
import bisq.proto.grpc.GetTxFeeRateRequest;
import bisq.proto.grpc.GetUnusedBsqAddressReply;
import bisq.proto.grpc.GetUnusedBsqAddressRequest;
import bisq.proto.grpc.LockWalletReply;
import bisq.proto.grpc.LockWalletRequest;
import bisq.proto.grpc.RemoveWalletPasswordReply;
import bisq.proto.grpc.RemoveWalletPasswordRequest;
import bisq.proto.grpc.SendBsqReply;
import bisq.proto.grpc.SendBsqRequest;
import bisq.proto.grpc.SendBtcReply;
import bisq.proto.grpc.SendBtcRequest;
import bisq.proto.grpc.SetTxFeeRatePreferenceReply;
import bisq.proto.grpc.SetTxFeeRatePreferenceRequest;
import bisq.proto.grpc.SetWalletPasswordReply;
import bisq.proto.grpc.SetWalletPasswordRequest;
import bisq.proto.grpc.UnlockWalletReply;
import bisq.proto.grpc.UnlockWalletRequest;
import bisq.proto.grpc.UnsetTxFeeRatePreferenceReply;
import bisq.proto.grpc.UnsetTxFeeRatePreferenceRequest;
import bisq.proto.grpc.VerifyBsqSentToAddressReply;
import bisq.proto.grpc.VerifyBsqSentToAddressRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import org.bitcoinj.core.Transaction;
import javax.inject.Inject;
import com.google.common.util.concurrent.FutureCallback;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import static bisq.core.api.model.TxInfo.toTxInfo;
import static bisq.daemon.grpc.interceptor.GrpcServiceRateMeteringConfig.getCustomRateMeteringInterceptor;
import static bisq.proto.grpc.WalletsGrpc.*;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import bisq.daemon.grpc.interceptor.CallRateMeteringInterceptor;
import bisq.daemon.grpc.interceptor.GrpcCallRateMeter;
@Slf4j
class GrpcWalletsService extends WalletsImplBase {
private final CoreApi coreApi;
private final GrpcExceptionHandler exceptionHandler;
@Inject
public GrpcWalletsService(CoreApi coreApi, GrpcExceptionHandler exceptionHandler) {
this.coreApi = coreApi;
this.exceptionHandler = exceptionHandler;
}
@Override
public void getBalances(GetBalancesRequest req, StreamObserver<GetBalancesReply> responseObserver) {
try {
var balances = coreApi.getBalances(req.getCurrencyCode());
var reply = GetBalancesReply.newBuilder()
.setBalances(balances.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getAddressBalance(GetAddressBalanceRequest req,
StreamObserver<GetAddressBalanceReply> responseObserver) {
try {
AddressBalanceInfo balanceInfo = coreApi.getAddressBalanceInfo(req.getAddress());
var reply = GetAddressBalanceReply.newBuilder()
.setAddressBalanceInfo(balanceInfo.toProtoMessage()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getFundingAddresses(GetFundingAddressesRequest req,
StreamObserver<GetFundingAddressesReply> responseObserver) {
try {
List<AddressBalanceInfo> balanceInfo = coreApi.getFundingAddresses();
var reply = GetFundingAddressesReply.newBuilder()
.addAllAddressBalanceInfo(
balanceInfo.stream()
.map(AddressBalanceInfo::toProtoMessage)
.collect(Collectors.toList()))
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getUnusedBsqAddress(GetUnusedBsqAddressRequest req,
StreamObserver<GetUnusedBsqAddressReply> responseObserver) {
try {
String address = coreApi.getUnusedBsqAddress();
var reply = GetUnusedBsqAddressReply.newBuilder()
.setAddress(address)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void sendBsq(SendBsqRequest req,
StreamObserver<SendBsqReply> responseObserver) {
try {
coreApi.sendBsq(req.getAddress(),
req.getAmount(),
req.getTxFeeRate(),
new TxBroadcaster.Callback() {
@Override
public void onSuccess(Transaction tx) {
log.info("Successfully published BSQ tx: id {}, output sum {} sats, fee {} sats, size {} bytes",
tx.getTxId().toString(),
tx.getOutputSum(),
tx.getFee(),
tx.getMessageSize());
var reply = SendBsqReply.newBuilder()
.setTxInfo(toTxInfo(tx).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onFailure(TxBroadcastException ex) {
throw new IllegalStateException(ex);
}
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void sendBtc(SendBtcRequest req,
StreamObserver<SendBtcReply> responseObserver) {
try {
coreApi.sendBtc(req.getAddress(),
req.getAmount(),
req.getTxFeeRate(),
req.getMemo(),
new FutureCallback<>() {
@Override
public void onSuccess(Transaction tx) {
if (tx != null) {
log.info("Successfully published BTC tx: id {}, output sum {} sats, fee {} sats, size {} bytes",
tx.getTxId().toString(),
tx.getOutputSum(),
tx.getFee(),
tx.getMessageSize());
var reply = SendBtcReply.newBuilder()
.setTxInfo(toTxInfo(tx).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
throw new IllegalStateException("btc transaction is null");
}
}
@Override
public void onFailure(@NotNull Throwable t) {
log.error("", t);
throw new IllegalStateException(t);
}
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void verifyBsqSentToAddress(VerifyBsqSentToAddressRequest req,
StreamObserver<VerifyBsqSentToAddressReply> responseObserver) {
try {
boolean isAmountReceived = coreApi.verifyBsqSentToAddress(req.getAddress(), req.getAmount());
var reply = VerifyBsqSentToAddressReply.newBuilder()
.setIsAmountReceived(isAmountReceived)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getTxFeeRate(GetTxFeeRateRequest req,
StreamObserver<GetTxFeeRateReply> responseObserver) {
try {
coreApi.getTxFeeRate(() -> {
TxFeeRateInfo txFeeRateInfo = coreApi.getMostRecentTxFeeRateInfo();
var reply = GetTxFeeRateReply.newBuilder()
.setTxFeeRateInfo(txFeeRateInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void setTxFeeRatePreference(SetTxFeeRatePreferenceRequest req,
StreamObserver<SetTxFeeRatePreferenceReply> responseObserver) {
try {
coreApi.setTxFeeRatePreference(req.getTxFeeRatePreference(), () -> {
TxFeeRateInfo txFeeRateInfo = coreApi.getMostRecentTxFeeRateInfo();
var reply = SetTxFeeRatePreferenceReply.newBuilder()
.setTxFeeRateInfo(txFeeRateInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void unsetTxFeeRatePreference(UnsetTxFeeRatePreferenceRequest req,
StreamObserver<UnsetTxFeeRatePreferenceReply> responseObserver) {
try {
coreApi.unsetTxFeeRatePreference(() -> {
TxFeeRateInfo txFeeRateInfo = coreApi.getMostRecentTxFeeRateInfo();
var reply = UnsetTxFeeRatePreferenceReply.newBuilder()
.setTxFeeRateInfo(txFeeRateInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void getTransaction(GetTransactionRequest req,
StreamObserver<GetTransactionReply> responseObserver) {
try {
Transaction tx = coreApi.getTransaction(req.getTxId());
var reply = GetTransactionReply.newBuilder()
.setTxInfo(toTxInfo(tx).toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void setWalletPassword(SetWalletPasswordRequest req,
StreamObserver<SetWalletPasswordReply> responseObserver) {
try {
coreApi.setWalletPassword(req.getPassword(), req.getNewPassword());
var reply = SetWalletPasswordReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void removeWalletPassword(RemoveWalletPasswordRequest req,
StreamObserver<RemoveWalletPasswordReply> responseObserver) {
try {
coreApi.removeWalletPassword(req.getPassword());
var reply = RemoveWalletPasswordReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void lockWallet(LockWalletRequest req,
StreamObserver<LockWalletReply> responseObserver) {
try {
coreApi.lockWallet();
var reply = LockWalletReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override
public void unlockWallet(UnlockWalletRequest req,
StreamObserver<UnlockWalletReply> responseObserver) {
try {
coreApi.unlockWallet(req.getPassword(), req.getTimeout());
var reply = UnlockWalletReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
final ServerInterceptor[] interceptors() {
Optional<ServerInterceptor> rateMeteringInterceptor = rateMeteringInterceptor();
return rateMeteringInterceptor.map(serverInterceptor ->
new ServerInterceptor[]{serverInterceptor}).orElseGet(() -> new ServerInterceptor[0]);
}
final Optional<ServerInterceptor> rateMeteringInterceptor() {
return getCustomRateMeteringInterceptor(coreApi.getConfig().appDataDir, this.getClass())
.or(() -> Optional.of(CallRateMeteringInterceptor.valueOf(
new HashMap<>() {{
put(getGetBalancesMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetAddressBalanceMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetFundingAddressesMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetUnusedBsqAddressMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getSendBsqMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getSendBtcMethod().getFullMethodName(), new GrpcCallRateMeter(1, MINUTES));
put(getGetTxFeeRateMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getSetTxFeeRatePreferenceMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getUnsetTxFeeRatePreferenceMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getGetTransactionMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
// Trying to set or remove a wallet password several times before the 1st attempt has time to
// persist the change to disk may corrupt the wallet, so allow only 1 attempt per 5 seconds.
put(getSetWalletPasswordMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS, 5));
put(getRemoveWalletPasswordMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS, 5));
put(getLockWalletMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
put(getUnlockWalletMethod().getFullMethodName(), new GrpcCallRateMeter(1, SECONDS));
}}
)));
}
}

View file

@ -0,0 +1,136 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc.interceptor;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.StatusRuntimeException;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import static io.grpc.Status.PERMISSION_DENIED;
import static java.lang.String.format;
import static java.util.stream.Collectors.joining;
@Slf4j
public final class CallRateMeteringInterceptor implements ServerInterceptor {
// Maps the gRPC server method names to rate meters. This allows one interceptor
// instance to handle rate metering for any or all the methods in a Grpc*Service.
protected final Map<String, GrpcCallRateMeter> serviceCallRateMeters;
public CallRateMeteringInterceptor(Map<String, GrpcCallRateMeter> serviceCallRateMeters) {
this.serviceCallRateMeters = serviceCallRateMeters;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata headers,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
Optional<Map.Entry<String, GrpcCallRateMeter>> rateMeterKV = getRateMeterKV(serverCall);
rateMeterKV.ifPresentOrElse(
(kv) -> checkRateMeterAndMaybeCloseCall(kv, serverCall),
() -> handleMissingRateMeterConfiguration(serverCall));
// We leave it to the gRPC framework to clean up if the server call was closed
// above. But we still have to invoke startCall here because the method must
// return a ServerCall.Listener<RequestT>.
return serverCallHandler.startCall(serverCall, headers);
}
private void checkRateMeterAndMaybeCloseCall(Map.Entry<String, GrpcCallRateMeter> rateMeterKV,
ServerCall<?, ?> serverCall) {
String methodName = rateMeterKV.getKey();
GrpcCallRateMeter rateMeter = rateMeterKV.getValue();
if (!rateMeter.checkAndIncrement())
handlePermissionDeniedWarningAndCloseCall(methodName, rateMeter, serverCall);
else
log.info(rateMeter.getCallsCountProgress(methodName));
}
private void handleMissingRateMeterConfiguration(ServerCall<?, ?> serverCall)
throws StatusRuntimeException {
log.debug("The gRPC service's call rate metering interceptor does not"
+ " meter the {} method.",
getRateMeterKey(serverCall));
}
private void handlePermissionDeniedWarningAndCloseCall(String methodName,
GrpcCallRateMeter rateMeter,
ServerCall<?, ?> serverCall)
throws StatusRuntimeException {
String msg = getDefaultRateExceededError(methodName, rateMeter);
log.warn(msg + ".");
serverCall.close(PERMISSION_DENIED.withDescription(msg.toLowerCase()), new Metadata());
}
private String getDefaultRateExceededError(String methodName,
GrpcCallRateMeter rateMeter) {
// The derived method name may not be an exact match to CLI's method name.
String timeUnitName = StringUtils.chop(rateMeter.getTimeUnit().name().toLowerCase());
// Just print 'getversion', not the grpc method descriptor's
// full-method-name: 'io.bisq.protobuffer.getversion/getversion'.
String loggedMethodName = methodName.split("/")[1];
return format("The maximum allowed number of %s calls (%d/%s) has been exceeded",
loggedMethodName,
rateMeter.getAllowedCallsPerTimeWindow(),
timeUnitName);
}
private Optional<Map.Entry<String, GrpcCallRateMeter>> getRateMeterKV(ServerCall<?, ?> serverCall) {
String rateMeterKey = getRateMeterKey(serverCall);
return serviceCallRateMeters.entrySet().stream()
.filter((e) -> e.getKey().equals(rateMeterKey)).findFirst();
}
private String getRateMeterKey(ServerCall<?, ?> serverCall) {
// Get the rate meter map key from the server call method descriptor. The
// returned String (e.g., 'io.bisq.protobuffer.Offers/CreateOffer') will match
// a map entry key in the 'serviceCallRateMeters' constructor argument, if it
// was defined in the Grpc*Service class' rateMeteringInterceptor method.
return serverCall.getMethodDescriptor().getFullMethodName();
}
@Override
public String toString() {
String rateMetersString =
serviceCallRateMeters.entrySet()
.stream()
.map(Object::toString)
.collect(joining("\n\t\t"));
return "CallRateMeteringInterceptor {" + "\n\t" +
"serviceCallRateMeters {" + "\n\t\t" +
rateMetersString + "\n\t" + "}" + "\n"
+ "}";
}
public static CallRateMeteringInterceptor valueOf(Map<String, GrpcCallRateMeter> rateMeters) {
return new CallRateMeteringInterceptor(new HashMap<>() {{
putAll(rateMeters);
}});
}
}

View file

@ -0,0 +1,95 @@
package bisq.daemon.grpc.interceptor;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
@Slf4j
public class GrpcCallRateMeter {
@Getter
private final int allowedCallsPerTimeWindow;
@Getter
private final TimeUnit timeUnit;
@Getter
private final int numTimeUnits;
@Getter
private transient final long timeUnitIntervalInMilliseconds;
private transient final ArrayDeque<Long> callTimestamps;
public GrpcCallRateMeter(int allowedCallsPerTimeWindow, TimeUnit timeUnit) {
this(allowedCallsPerTimeWindow, timeUnit, 1);
}
public GrpcCallRateMeter(int allowedCallsPerTimeWindow, TimeUnit timeUnit, int numTimeUnits) {
this.allowedCallsPerTimeWindow = allowedCallsPerTimeWindow;
this.timeUnit = timeUnit;
this.numTimeUnits = numTimeUnits;
this.timeUnitIntervalInMilliseconds = timeUnit.toMillis(1) * numTimeUnits;
this.callTimestamps = new ArrayDeque<>();
}
public boolean checkAndIncrement() {
if (getCallsCount() < allowedCallsPerTimeWindow) {
incrementCallsCount();
return true;
} else {
return false;
}
}
public int getCallsCount() {
removeStaleCallTimestamps();
return callTimestamps.size();
}
public String getCallsCountProgress(String calledMethodName) {
String shortTimeUnitName = StringUtils.chop(timeUnit.name().toLowerCase());
// Just print 'GetVersion has been called N times...',
// not 'io.bisq.protobuffer.GetVersion/GetVersion has been called N times...'
String loggedMethodName = calledMethodName.split("/")[1];
return format("%s has been called %d time%s in the last %s, rate limit is %d/%s",
loggedMethodName,
callTimestamps.size(),
callTimestamps.size() == 1 ? "" : "s",
shortTimeUnitName,
allowedCallsPerTimeWindow,
shortTimeUnitName);
}
private void incrementCallsCount() {
callTimestamps.add(currentTimeMillis());
}
private void removeStaleCallTimestamps() {
while (!callTimestamps.isEmpty() && isStale.test(callTimestamps.peek())) {
callTimestamps.remove();
}
}
private final Predicate<Long> isStale = (t) -> {
long stale = currentTimeMillis() - this.getTimeUnitIntervalInMilliseconds();
// Is the given timestamp before the current time minus 1 timeUnit in millis?
return t < stale;
};
@Override
public String toString() {
return "GrpcCallRateMeter{" +
"allowedCallsPerTimeWindow=" + allowedCallsPerTimeWindow +
", timeUnit=" + timeUnit.name() +
", timeUnitIntervalInMilliseconds=" + timeUnitIntervalInMilliseconds +
", callsCount=" + callTimestamps.size() +
'}';
}
}

View file

@ -0,0 +1,288 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc.interceptor;
import io.grpc.ServerInterceptor;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.common.annotations.VisibleForTesting;
import java.nio.file.Paths;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import static bisq.common.file.FileUtil.deleteFileIfExists;
import static bisq.common.file.FileUtil.renameFile;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.lang.System.getProperty;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.readAllBytes;
@VisibleForTesting
@Slf4j
public class GrpcServiceRateMeteringConfig {
public static final String RATE_METERS_CONFIG_FILENAME = "ratemeters.json";
private static final String KEY_GRPC_SERVICE_CLASS_NAME = "grpcServiceClassName";
private static final String KEY_METHOD_RATE_METERS = "methodRateMeters";
private static final String KEY_ALLOWED_CALL_PER_TIME_WINDOW = "allowedCallsPerTimeWindow";
private static final String KEY_TIME_UNIT = "timeUnit";
private static final String KEY_NUM_TIME_UNITS = "numTimeUnits";
private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private final List<Map<String, GrpcCallRateMeter>> methodRateMeters;
private final String grpcServiceClassName;
public GrpcServiceRateMeteringConfig(String grpcServiceClassName) {
this(grpcServiceClassName, new ArrayList<>());
}
public GrpcServiceRateMeteringConfig(String grpcServiceClassName,
List<Map<String, GrpcCallRateMeter>> methodRateMeters) {
this.grpcServiceClassName = grpcServiceClassName;
this.methodRateMeters = methodRateMeters;
}
@SuppressWarnings("unused")
public GrpcServiceRateMeteringConfig addMethodCallRateMeter(String methodName,
int maxCalls,
TimeUnit timeUnit) {
return addMethodCallRateMeter(methodName, maxCalls, timeUnit, 1);
}
public GrpcServiceRateMeteringConfig addMethodCallRateMeter(String methodName,
int maxCalls,
TimeUnit timeUnit,
int numTimeUnits) {
methodRateMeters.add(new LinkedHashMap<>() {{
put(methodName, new GrpcCallRateMeter(maxCalls, timeUnit, numTimeUnits));
}});
return this;
}
public boolean isConfigForGrpcService(Class<?> clazz) {
return isConfigForGrpcService(clazz.getSimpleName());
}
public boolean isConfigForGrpcService(String grpcServiceClassSimpleName) {
return this.grpcServiceClassName.equals(grpcServiceClassSimpleName);
}
@Override
public String toString() {
return "GrpcServiceRateMeteringConfig{" + "\n" +
" grpcServiceClassName='" + grpcServiceClassName + '\'' + "\n" +
", methodRateMeters=" + methodRateMeters + "\n" +
'}';
}
public static Optional<ServerInterceptor> getCustomRateMeteringInterceptor(File installationDir,
Class<?> grpcServiceClass) {
File configFile = new File(installationDir, RATE_METERS_CONFIG_FILENAME);
return configFile.exists()
? toServerInterceptor(configFile, grpcServiceClass)
: Optional.empty();
}
public static Optional<ServerInterceptor> toServerInterceptor(File configFile, Class<?> grpcServiceClass) {
// From a global rate metering config file, create a specific gRPC service
// interceptor configuration in the form of an interceptor constructor argument,
// a map<method-name, rate-meter>.
// Transforming json into the List<Map<String, GrpcCallRateMeter>> is a bit
// convoluted due to Gson's loss of generic type information during deserialization.
Optional<GrpcServiceRateMeteringConfig> grpcServiceConfig = getAllDeserializedConfigs(configFile)
.stream().filter(x -> x.isConfigForGrpcService(grpcServiceClass)).findFirst();
if (grpcServiceConfig.isPresent()) {
Map<String, GrpcCallRateMeter> serviceCallRateMeters = new HashMap<>();
for (Map<String, GrpcCallRateMeter> methodToRateMeterMap : grpcServiceConfig.get().methodRateMeters) {
Map.Entry<String, GrpcCallRateMeter> entry = methodToRateMeterMap.entrySet().stream().findFirst().orElseThrow(()
-> new IllegalStateException("Gson deserialized a method rate meter configuration into an empty map."));
serviceCallRateMeters.put(entry.getKey(), entry.getValue());
}
return Optional.of(new CallRateMeteringInterceptor(serviceCallRateMeters));
} else {
return Optional.empty();
}
}
@SuppressWarnings("unchecked")
private static List<Map<String, GrpcCallRateMeter>> getMethodRateMetersMap(Map<String, Object> gsonMap) {
List<Map<String, GrpcCallRateMeter>> rateMeters = new ArrayList<>();
// Each gsonMap is a Map<String, Object> with a single entry:
// {getVersion={allowedCallsPerTimeUnit=8.0, timeUnit=SECONDS, callsCount=0.0, isRunning=false}}
// Convert it to a multiple entry Map<String, GrpcCallRateMeter>, where the key
// is a method name.
for (Map<String, Object> singleEntryRateMeterMap : (List<Map<String, Object>>) gsonMap.get(KEY_METHOD_RATE_METERS)) {
log.debug("Gson's single entry {} {}<String, Object> = {}",
gsonMap.get(KEY_GRPC_SERVICE_CLASS_NAME),
singleEntryRateMeterMap.getClass().getSimpleName(),
singleEntryRateMeterMap);
Map.Entry<String, Object> entry = singleEntryRateMeterMap.entrySet().stream().findFirst().orElseThrow(()
-> new IllegalStateException("Gson deserialized a method rate meter configuration into an empty map."));
String methodName = entry.getKey();
GrpcCallRateMeter rateMeter = getGrpcCallRateMeter(entry);
rateMeters.add(new LinkedHashMap<>() {{
put(methodName, rateMeter);
}});
}
return rateMeters;
}
@SuppressWarnings({"rawtypes", "unchecked"})
public static List<GrpcServiceRateMeteringConfig> deserialize(File configFile) {
verifyConfigFile(configFile);
List<GrpcServiceRateMeteringConfig> serviceMethodConfigurations = new ArrayList<>();
// Gson cannot deserialize a json string to List<GrpcServiceRateMeteringConfig>
// so easily for us, so we do it here before returning the list of configurations.
List rawConfigList = gson.fromJson(toJson(configFile), ArrayList.class);
// Gson gave us a list of maps with keys grpcServiceClassName, methodRateMeters:
// String grpcServiceClassName
// List<Map> methodRateMeters
for (Object rawConfig : rawConfigList) {
Map<String, Object> gsonMap = (Map<String, Object>) rawConfig;
String grpcServiceClassName = (String) gsonMap.get(KEY_GRPC_SERVICE_CLASS_NAME);
List<Map<String, GrpcCallRateMeter>> rateMeters = getMethodRateMetersMap(gsonMap);
serviceMethodConfigurations.add(new GrpcServiceRateMeteringConfig(grpcServiceClassName, rateMeters));
}
return serviceMethodConfigurations;
}
@SuppressWarnings("unchecked")
private static GrpcCallRateMeter getGrpcCallRateMeter(Map.Entry<String, Object> gsonEntry) {
Map<String, Object> valueMap = (Map<String, Object>) gsonEntry.getValue();
int allowedCallsPerTimeWindow = ((Number) valueMap.get(KEY_ALLOWED_CALL_PER_TIME_WINDOW)).intValue();
TimeUnit timeUnit = TimeUnit.valueOf((String) valueMap.get(KEY_TIME_UNIT));
int numTimeUnits = ((Number) valueMap.get(KEY_NUM_TIME_UNITS)).intValue();
return new GrpcCallRateMeter(allowedCallsPerTimeWindow, timeUnit, numTimeUnits);
}
private static void verifyConfigFile(File configFile) {
if (configFile == null)
throw new IllegalStateException("Cannot read null json config file.");
if (!configFile.exists())
throw new IllegalStateException(format("cannot find json config file %s", configFile.getAbsolutePath()));
}
private static String toJson(File configFile) {
try {
return new String(readAllBytes(Paths.get(configFile.getAbsolutePath())));
} catch (IOException ex) {
throw new IllegalStateException(format("Cannot read json string from file %s.",
configFile.getAbsolutePath()));
}
}
private static List<GrpcServiceRateMeteringConfig> allDeserializedConfigs;
private static List<GrpcServiceRateMeteringConfig> getAllDeserializedConfigs(File configFile) {
// We deserialize once, not for each gRPC service wanting an interceptor.
if (allDeserializedConfigs == null)
allDeserializedConfigs = deserialize(configFile);
return allDeserializedConfigs;
}
@VisibleForTesting
public static class Builder {
private final List<GrpcServiceRateMeteringConfig> rateMeterConfigs = new ArrayList<>();
public void addCallRateMeter(String grpcServiceClassName,
String methodName,
int maxCalls,
TimeUnit timeUnit) {
addCallRateMeter(grpcServiceClassName,
methodName,
maxCalls,
timeUnit,
1);
}
public void addCallRateMeter(String grpcServiceClassName,
String methodName,
int maxCalls,
TimeUnit timeUnit,
int numTimeUnits) {
log.info("Adding call rate metering definition {}.{} ({}/{}ms).",
grpcServiceClassName,
methodName,
maxCalls,
timeUnit.toMillis(1) * numTimeUnits);
rateMeterConfigs.stream().filter(c -> c.isConfigForGrpcService(grpcServiceClassName))
.findFirst().ifPresentOrElse(
(config) -> config.addMethodCallRateMeter(methodName, maxCalls, timeUnit, numTimeUnits),
() -> rateMeterConfigs.add(new GrpcServiceRateMeteringConfig(grpcServiceClassName)
.addMethodCallRateMeter(methodName, maxCalls, timeUnit, numTimeUnits)));
}
public File build() {
File tmpFile = serializeRateMeterDefinitions();
File configFile = Paths.get(getProperty("java.io.tmpdir"), "ratemeters.json").toFile();
try {
deleteFileIfExists(configFile);
renameFile(tmpFile, configFile);
} catch (IOException ex) {
throw new IllegalStateException(format("Could not create config file %s.",
configFile.getAbsolutePath()), ex);
}
return configFile;
}
private File serializeRateMeterDefinitions() {
String json = gson.toJson(rateMeterConfigs);
File file = createTmpFile();
try (OutputStreamWriter outputStreamWriter =
new OutputStreamWriter(new FileOutputStream(checkNotNull(file), false), UTF_8)) {
outputStreamWriter.write(json);
} catch (Exception ex) {
throw new IllegalStateException(format("Cannot write file for json string %s.", json), ex);
}
return file;
}
private File createTmpFile() {
File file;
try {
file = File.createTempFile("ratemeters_",
".tmp",
Paths.get(getProperty("java.io.tmpdir")).toFile());
} catch (IOException ex) {
throw new IllegalStateException("Cannot create tmp ratemeters json file.", ex);
}
return file;
}
}
}

View file

@ -0,0 +1,68 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.daemon.grpc.interceptor;
import bisq.common.config.Config;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.StatusRuntimeException;
import javax.inject.Inject;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.grpc.Metadata.Key;
import static io.grpc.Status.UNAUTHENTICATED;
import static java.lang.String.format;
/**
* Authorizes rpc server calls by comparing the value of the caller's
* {@value PASSWORD_KEY} header to an expected value set at server startup time.
*
* @see bisq.common.config.Config#apiPassword
*/
public class PasswordAuthInterceptor implements ServerInterceptor {
private static final String PASSWORD_KEY = "password";
private final String expectedPasswordValue;
@Inject
public PasswordAuthInterceptor(Config config) {
this.expectedPasswordValue = config.apiPassword;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata headers,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
var actualPasswordValue = headers.get(Key.of(PASSWORD_KEY, ASCII_STRING_MARSHALLER));
if (actualPasswordValue == null)
throw new StatusRuntimeException(UNAUTHENTICATED.withDescription(
format("missing '%s' rpc header value", PASSWORD_KEY)));
if (!actualPasswordValue.equals(expectedPasswordValue))
throw new StatusRuntimeException(UNAUTHENTICATED.withDescription(
format("incorrect '%s' rpc header value", PASSWORD_KEY)));
return serverCallHandler.startCall(serverCall, headers);
}
}

View file

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{30}: %msg %xEx%n)</pattern>
</encoder>
</appender>
<root level="TRACE">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<logger name="io.grpc.netty" level="WARN"/>
</configuration>