support multithreading in api and protocols

close trade wallets while unused for scalability
verify txs do not use unlock height
increase trade init timeout to 60s
This commit is contained in:
woodser 2022-03-31 08:17:58 -04:00
parent fdddc87477
commit bb95b4b1d6
82 changed files with 2786 additions and 2338 deletions

View file

@ -70,7 +70,7 @@ public class GrpcErrorMessageHandler implements ErrorMessageHandler {
}
@Override
public void handleErrorMessage(String errorMessage) {
public synchronized void handleErrorMessage(String errorMessage) {
// A task runner may call handleErrorMessage(String) more than once.
// Throw only one exception if that happens, to avoid looping until the
// grpc stream is closed

View file

@ -47,7 +47,7 @@ class GrpcExceptionHandler {
public GrpcExceptionHandler() {
}
public void handleException(Logger log,
public synchronized void handleException(Logger log,
Throwable t,
StreamObserver<?> responseObserver) {
// Log the core api error (this is last chance to do that), wrap it in a new
@ -58,7 +58,7 @@ class GrpcExceptionHandler {
throw grpcStatusRuntimeException;
}
public void handleExceptionAsWarning(Logger log,
public synchronized void handleExceptionAsWarning(Logger log,
String calledMethod,
Throwable t,
StreamObserver<?> responseObserver) {

View file

@ -8,7 +8,7 @@ import bisq.proto.grpc.NotificationsGrpc.NotificationsImplBase;
import bisq.proto.grpc.RegisterNotificationListenerRequest;
import bisq.proto.grpc.SendNotificationReply;
import bisq.proto.grpc.SendNotificationRequest;
import io.grpc.Context;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
@ -46,24 +46,30 @@ class GrpcNotificationsService extends NotificationsImplBase {
@Override
public void registerNotificationListener(RegisterNotificationListenerRequest request,
StreamObserver<NotificationMessage> responseObserver) {
try {
coreApi.addNotificationListener(new GrpcNotificationListener(responseObserver));
// No onNext / onCompleted, as the response observer should be kept open
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
Context ctx = Context.current().fork(); // context is independent for long-lived request
ctx.run(() -> {
try {
coreApi.addNotificationListener(new GrpcNotificationListener(responseObserver));
// No onNext / onCompleted, as the response observer should be kept open
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
});
}
@Override
public void sendNotification(SendNotificationRequest request,
StreamObserver<SendNotificationReply> responseObserver) {
try {
coreApi.sendNotification(request.getNotification());
responseObserver.onNext(SendNotificationReply.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
Context ctx = Context.current().fork(); // context is independent from notification delivery
ctx.run(() -> {
try {
coreApi.sendNotification(request.getNotification());
responseObserver.onNext(SendNotificationReply.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
exceptionHandler.handleException(log, t, responseObserver);
}
});
}
@Value

View file

@ -141,6 +141,11 @@ class GrpcOffersService extends OffersImplBase {
@Override
public void createOffer(CreateOfferRequest req,
StreamObserver<CreateOfferReply> responseObserver) {
GrpcErrorMessageHandler errorMessageHandler =
new GrpcErrorMessageHandler(getCreateOfferMethod().getFullMethodName(),
responseObserver,
exceptionHandler,
log);
try {
coreApi.createAnPlaceOffer(
req.getCurrencyCode(),
@ -162,6 +167,10 @@ class GrpcOffersService extends OffersImplBase {
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
},
errorMessage -> {
if (!errorMessageHandler.isErrorHandled())
errorMessageHandler.handleErrorMessage(errorMessage);
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);

View file

@ -65,7 +65,6 @@ public class GrpcServer {
GrpcMoneroConnectionsService moneroConnectionsService,
GrpcMoneroNodeService moneroNodeService) {
this.server = ServerBuilder.forPort(config.apiPort)
.executor(UserThread.getExecutor())
.addService(interceptForward(accountService, accountService.interceptors()))
.addService(interceptForward(disputeAgentsService, disputeAgentsService.interceptors()))
.addService(interceptForward(disputesService, disputesService.interceptors()))

View file

@ -19,7 +19,6 @@ package bisq.daemon.grpc;
import bisq.core.api.CoreApi;
import bisq.core.api.model.TradeInfo;
import bisq.core.support.messages.ChatMessage;
import bisq.core.trade.Trade;
import bisq.proto.grpc.ConfirmPaymentReceivedReply;
@ -40,7 +39,6 @@ import bisq.proto.grpc.TakeOfferReply;
import bisq.proto.grpc.TakeOfferRequest;
import bisq.proto.grpc.WithdrawFundsReply;
import bisq.proto.grpc.WithdrawFundsRequest;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
@ -122,20 +120,24 @@ class GrpcTradesService extends TradesImplBase {
responseObserver,
exceptionHandler,
log);
coreApi.takeOffer(req.getOfferId(),
req.getPaymentAccountId(),
trade -> {
TradeInfo tradeInfo = toTradeInfo(trade);
var reply = TakeOfferReply.newBuilder()
.setTrade(tradeInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
},
errorMessage -> {
if (!errorMessageHandler.isErrorHandled())
errorMessageHandler.handleErrorMessage(errorMessage);
});
try {
coreApi.takeOffer(req.getOfferId(),
req.getPaymentAccountId(),
trade -> {
TradeInfo tradeInfo = toTradeInfo(trade);
var reply = TakeOfferReply.newBuilder()
.setTrade(tradeInfo.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
},
errorMessage -> {
if (!errorMessageHandler.isErrorHandled())
errorMessageHandler.handleErrorMessage(errorMessage);
});
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
}
@Override

View file

@ -17,6 +17,7 @@
package bisq.daemon.grpc;
import bisq.common.UserThread;
import bisq.core.api.CoreApi;
import bisq.core.api.model.AddressBalanceInfo;
import bisq.core.api.model.TxFeeRateInfo;
@ -102,16 +103,18 @@ class GrpcWalletsService extends WalletsImplBase {
@Override
public void getBalances(GetBalancesRequest req, StreamObserver<GetBalancesReply> responseObserver) {
try {
var balances = coreApi.getBalances(req.getCurrencyCode());
var reply = GetBalancesReply.newBuilder()
.setBalances(balances.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
UserThread.execute(() -> { // TODO (woodser): Balances.updateBalances() runs on UserThread for JFX components, so call from user thread, else the properties may not be updated. remove JFX properties or push delay into CoreWalletsService.getXmrBalances()?
try {
var balances = coreApi.getBalances(req.getCurrencyCode());
var reply = GetBalancesReply.newBuilder()
.setBalances(balances.toProtoMessage())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (Throwable cause) {
exceptionHandler.handleException(log, cause, responseObserver);
}
});
}
@Override

View file

@ -40,31 +40,37 @@ public class GrpcCallRateMeter {
}
public boolean checkAndIncrement() {
if (getCallsCount() < allowedCallsPerTimeWindow) {
incrementCallsCount();
return true;
} else {
return false;
synchronized (callTimestamps) {
if (getCallsCount() < allowedCallsPerTimeWindow) {
incrementCallsCount();
return true;
} else {
return false;
}
}
}
public int getCallsCount() {
removeStaleCallTimestamps();
return callTimestamps.size();
synchronized (callTimestamps) {
removeStaleCallTimestamps();
return callTimestamps.size();
}
}
public String getCallsCountProgress(String calledMethodName) {
String shortTimeUnitName = StringUtils.chop(timeUnit.name().toLowerCase());
// Just print 'GetVersion has been called N times...',
// not 'io.bisq.protobuffer.GetVersion/GetVersion has been called N times...'
String loggedMethodName = calledMethodName.split("/")[1];
return format("%s has been called %d time%s in the last %s, rate limit is %d/%s",
loggedMethodName,
callTimestamps.size(),
callTimestamps.size() == 1 ? "" : "s",
shortTimeUnitName,
allowedCallsPerTimeWindow,
shortTimeUnitName);
synchronized (callTimestamps) {
String shortTimeUnitName = StringUtils.chop(timeUnit.name().toLowerCase());
// Just print 'GetVersion has been called N times...',
// not 'io.bisq.protobuffer.GetVersion/GetVersion has been called N times...'
String loggedMethodName = calledMethodName.split("/")[1];
return format("%s has been called %d time%s in the last %s, rate limit is %d/%s",
loggedMethodName,
callTimestamps.size(),
callTimestamps.size() == 1 ? "" : "s",
shortTimeUnitName,
allowedCallsPerTimeWindow,
shortTimeUnitName);
}
}
private void incrementCallsCount() {
@ -85,11 +91,13 @@ public class GrpcCallRateMeter {
@Override
public String toString() {
return "GrpcCallRateMeter{" +
"allowedCallsPerTimeWindow=" + allowedCallsPerTimeWindow +
", timeUnit=" + timeUnit.name() +
", timeUnitIntervalInMilliseconds=" + timeUnitIntervalInMilliseconds +
", callsCount=" + callTimestamps.size() +
'}';
synchronized (callTimestamps) {
return "GrpcCallRateMeter{" +
"allowedCallsPerTimeWindow=" + allowedCallsPerTimeWindow +
", timeUnit=" + timeUnit.name() +
", timeUnitIntervalInMilliseconds=" + timeUnitIntervalInMilliseconds +
", callsCount=" + callTimestamps.size() +
'}';
}
}
}