lib: rename RemoteNodeClient to MoneroNodeClient and hide implementation

This commit is contained in:
Oscar Mira 2024-05-15 11:20:20 +02:00
parent 308031250b
commit 30ef8b481b
No known key found for this signature in database
GPG Key ID: B371B98C5DC32237
22 changed files with 352 additions and 289 deletions

View File

@ -52,7 +52,7 @@ class MoneroSdkClient(private val context: Context) {
httpClient: OkHttpClient,
): MoneroWallet {
val dataStore = WalletDataStoreFile(filename)
val client = RemoteNodeClient.forNetwork(
val client = MoneroNodeClient.forNetwork(
network = network,
remoteNodes = remoteNodes,
loadBalancerRule = RoundRobinRule(),

View File

@ -51,8 +51,8 @@ class WalletRepository(
fun getWalletIdList() = walletDataSource.readWalletIdList()
fun getRemoteClients(): Flow<List<RemoteNodeClient>> =
getWalletIdList().map { it.mapNotNull { walletId -> getWallet(walletId).remoteNodeClient } }
fun getMoneroNodeClients(): Flow<List<MoneroNodeClient>> =
getWalletIdList().map { it.mapNotNull { walletId -> getWallet(walletId).moneroNodeClient } }
fun getWalletConfig(walletId: Long) = walletDataSource.readWalletConfig(walletId)

View File

@ -82,7 +82,7 @@ class SettingsViewModel(
}
private suspend fun onProxyChanged(newProxy: Proxy) {
walletRepository.getRemoteClients().first().forEach { client ->
walletRepository.getMoneroNodeClients().first().forEach { client ->
val current = client.httpClient.proxy
if (current != newProxy) {
val builder = client.httpClient.newBuilder()

View File

@ -1,3 +0,0 @@
package im.molly.monero;
parcelable HttpResponse;

View File

@ -1,6 +0,0 @@
package im.molly.monero;
oneway interface IHttpRequestCallback {
void onResponse(int code, String contentType, in ParcelFileDescriptor body);
void onFailure();
}

View File

@ -1,8 +0,0 @@
package im.molly.monero;
import im.molly.monero.IHttpRequestCallback;
interface IRemoteNodeClient {
oneway void requestAsync(int requestId, String method, String path, String header, in byte[] bodyBytes, in IHttpRequestCallback callback);
oneway void cancelAsync(int requestId);
}

View File

@ -1,8 +0,0 @@
package im.molly.monero;
import im.molly.monero.IRemoteNodeClient;
interface IWalletClient {
int getNetworkId();
IRemoteNodeClient getRemoteNodeClient();
}

View File

@ -1,15 +1,15 @@
package im.molly.monero;
import im.molly.monero.IRemoteNodeClient;
import im.molly.monero.IStorageAdapter;
import im.molly.monero.IWalletServiceCallbacks;
import im.molly.monero.IWalletServiceListener;
import im.molly.monero.SecretKey;
import im.molly.monero.WalletConfig;
import im.molly.monero.internal.IHttpRpcClient;
interface IWalletService {
oneway void createWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback);
oneway void restoreWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint);
oneway void openWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback);
oneway void createWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback);
oneway void restoreWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint);
oneway void openWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback);
void setListener(in IWalletServiceListener listener);
}

View File

@ -0,0 +1,3 @@
package im.molly.monero.internal;
parcelable HttpRequest;

View File

@ -0,0 +1,3 @@
package im.molly.monero.internal;
parcelable HttpResponse;

View File

@ -0,0 +1,9 @@
package im.molly.monero.internal;
import im.molly.monero.internal.HttpResponse;
oneway interface IHttpRequestCallback {
void onResponse(in HttpResponse response);
void onError();
void onRequestCanceled();
}

View File

@ -0,0 +1,9 @@
package im.molly.monero.internal;
import im.molly.monero.internal.HttpRequest;
import im.molly.monero.internal.IHttpRequestCallback;
interface IHttpRpcClient {
oneway void callAsync(in HttpRequest request, in IHttpRequestCallback callback, int callId);
oneway void cancelAsync(int callId);
}

View File

@ -24,7 +24,7 @@ jmethodID ParcelFd_detachFd;
ScopedJavaGlobalRef<jclass> StringClass;
void InitializeJniCache(JNIEnv* env) {
jclass httpResponse = GetClass(env, "im/molly/monero/HttpResponse");
jclass httpResponse = GetClass(env, "im/molly/monero/internal/HttpResponse");
jclass iTransferCallback = GetClass(env, "im/molly/monero/ITransferCallback");
jclass logger = GetClass(env, "im/molly/monero/Logger");
jclass txInfo = GetClass(env, "im/molly/monero/internal/TxInfo");
@ -63,7 +63,7 @@ void InitializeJniCache(JNIEnv* env) {
WalletNative_callRemoteNode = GetMethodId(
env, walletNative,
"callRemoteNode",
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[B)Lim/molly/monero/HttpResponse;");
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[B)Lim/molly/monero/internal/HttpResponse;");
WalletNative_onRefresh = GetMethodId(
env, walletNative,
"onRefresh", "(IJZ)V");

View File

@ -0,0 +1,58 @@
package im.molly.monero
import im.molly.monero.internal.IHttpRpcClient
import im.molly.monero.internal.RpcClient
import im.molly.monero.loadbalancer.LoadBalancer
import im.molly.monero.loadbalancer.Rule
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import okhttp3.OkHttpClient
class MoneroNodeClient private constructor(
val network: MoneroNetwork,
private val rpcClient: RpcClient,
private val scope: CoroutineScope,
) : AutoCloseable {
companion object {
/**
* Constructs a [MoneroNodeClient] to connect to the Monero [network].
*/
fun forNetwork(
network: MoneroNetwork,
remoteNodes: Flow<List<RemoteNode>>,
loadBalancerRule: Rule,
httpClient: OkHttpClient,
retryBackoff: BackoffPolicy = ExponentialBackoff.Default,
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
): MoneroNodeClient {
val scope = CoroutineScope(ioDispatcher + SupervisorJob())
val loadBalancer = LoadBalancer(remoteNodes, scope)
val rpcClient = RpcClient(
loadBalancer = loadBalancer,
loadBalancerRule = loadBalancerRule,
retryBackoff = retryBackoff,
requestsScope = scope,
httpClient = httpClient,
)
return MoneroNodeClient(network, rpcClient, scope)
}
}
var httpClient: OkHttpClient
get() = rpcClient.httpClient
set(value) {
rpcClient.httpClient = value
}
internal val httpRpcClient: IHttpRpcClient
get() = rpcClient
override fun close() {
scope.cancel("MoneroNodeClient is closing: Cancelling all ongoing requests")
}
}

View File

@ -19,7 +19,7 @@ import kotlin.time.Duration.Companion.seconds
class MoneroWallet internal constructor(
private val wallet: IWallet,
private val storageAdapter: StorageAdapter,
val remoteNodeClient: RemoteNodeClient?,
val moneroNodeClient: MoneroNodeClient?,
) : AutoCloseable {
private val logger = loggerFor<MoneroWallet>()

View File

@ -1,224 +0,0 @@
package im.molly.monero
import android.net.Uri
import android.os.ParcelFileDescriptor
import im.molly.monero.loadbalancer.LoadBalancer
import im.molly.monero.loadbalancer.Rule
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import okhttp3.*
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.RequestBody.Companion.toRequestBody
import java.io.FileOutputStream
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
// TODO: Hide IRemoteNodeClient methods and rename to HttpRpcClient or MoneroNodeClient
class RemoteNodeClient private constructor(
val network: MoneroNetwork,
private val loadBalancer: LoadBalancer,
private val loadBalancerRule: Rule,
var httpClient: OkHttpClient,
private val retryBackoff: BackoffPolicy,
private val requestsScope: CoroutineScope,
) : IRemoteNodeClient.Stub(), AutoCloseable {
companion object {
/**
* Constructs a [RemoteNodeClient] to connect to the Monero [network].
*/
fun forNetwork(
network: MoneroNetwork,
remoteNodes: Flow<List<RemoteNode>>,
loadBalancerRule: Rule,
httpClient: OkHttpClient,
retryBackoff: BackoffPolicy = ExponentialBackoff.Default,
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
): RemoteNodeClient {
val scope = CoroutineScope(ioDispatcher + SupervisorJob())
return RemoteNodeClient(
network,
LoadBalancer(remoteNodes, scope),
loadBalancerRule,
httpClient,
retryBackoff,
scope
)
}
}
private val logger = loggerFor<RemoteNodeClient>()
private val requestList = ConcurrentHashMap<Int, Job>()
override fun requestAsync(
requestId: Int,
method: String,
path: String,
header: String?,
body: ByteArray?,
callback: IHttpRequestCallback?,
) {
logger.d("HTTP: $method $path, header_len=${header?.length}, body_size=${body?.size}")
val requestJob = requestsScope.launch {
runCatching {
requestWithRetry(method, path, header, body)
}.onSuccess { response ->
val status = response.code
val responseBody = response.body
if (responseBody == null) {
callback?.onResponse(status, null, null)
} else {
responseBody.use { body ->
val contentType = body.contentType()?.toString()
val pipe = ParcelFileDescriptor.createPipe()
pipe[1].use { writeSide ->
callback?.onResponse(status, contentType, pipe[0])
FileOutputStream(writeSide.fileDescriptor).use { out ->
runCatching { body.byteStream().copyTo(out) }
}
}
}
}
// TODO: Log response times
}.onFailure { throwable ->
logger.e("HTTP: Request failed", throwable)
callback?.onFailure()
}
}.also {
requestList[requestId] = it
}
requestJob.invokeOnCompletion {
requestList.remove(requestId)
}
}
override fun cancelAsync(requestId: Int) {
requestList[requestId]?.cancel()
}
override fun close() {
requestsScope.cancel()
}
private suspend fun requestWithRetry(
method: String,
path: String,
header: String?,
body: ByteArray?,
): Response {
val headers = parseHttpHeader(header)
val contentType = headers["Content-Type"]?.toMediaType()
// TODO: Log unsupported headers
val requestBuilder = with(Request.Builder()) {
when {
method.equals("GET", ignoreCase = true) -> {}
method.equals("POST", ignoreCase = true) -> {
val content = body ?: ByteArray(0)
post(content.toRequestBody(contentType))
}
else -> throw IllegalArgumentException("Unsupported method")
}
url("http:$path")
// TODO: Add authentication
}
val attempts = mutableMapOf<Uri, Int>()
while (true) {
val selected = loadBalancerRule.chooseNode(loadBalancer)
if (selected == null) {
logger.i("No remote node available")
return Response.Builder()
.request(requestBuilder.build())
.protocol(Protocol.HTTP_1_1)
.code(499)
.message("No remote node available")
.build()
}
val uri = selected.uriForPath(path)
val retryCount = attempts[uri] ?: 0
delay(retryBackoff.waitTime(retryCount))
logger.d("HTTP: $method $uri")
val response = try {
val request = requestBuilder.url(uri.toString()).build()
httpClient.newCall(request).await()
} catch (e: IOException) {
logger.e("HTTP: Request failed", e)
// TODO: Notify loadBalancer
continue
} finally {
attempts[uri] = retryCount + 1
}
if (response.isSuccessful) {
// TODO: Notify loadBalancer
return response
}
}
}
private fun parseHttpHeader(header: String?): Headers =
with(Headers.Builder()) {
header?.splitToSequence("\r\n")
?.filter { line -> line.isNotEmpty() }
?.forEach { line -> add(line) }
build()
}
private suspend fun Call.await() = suspendCoroutine { continuation ->
enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
continuation.resume(response)
}
override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}
})
}
// private val Response.roundTripMillis: Long
// get() = sentRequestAtMillis() - receivedResponseAtMillis()
}
@OptIn(ExperimentalCoroutinesApi::class)
internal suspend fun IRemoteNodeClient.request(request: HttpRequest): HttpResponse? =
suspendCancellableCoroutine { continuation ->
val requestId = request.hashCode()
val callback = object : IHttpRequestCallback.Stub() {
override fun onResponse(
code: Int,
contentType: String?,
body: ParcelFileDescriptor?,
) {
continuation.resume(HttpResponse(code, contentType, body)) {
body?.close()
}
}
override fun onFailure() {
continuation.resume(null) {}
}
}
with(request) {
requestAsync(requestId, method, path, header, bodyBytes, callback)
}
continuation.invokeOnCancellation {
cancelAsync(requestId)
}
}

View File

@ -2,12 +2,16 @@ package im.molly.monero
import android.os.ParcelFileDescriptor
import androidx.annotation.GuardedBy
import im.molly.monero.internal.HttpRequest
import im.molly.monero.internal.HttpResponse
import im.molly.monero.internal.IHttpRequestCallback
import im.molly.monero.internal.IHttpRpcClient
import im.molly.monero.internal.TxInfo
import kotlinx.coroutines.*
import java.io.Closeable
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.coroutines.CoroutineContext
@ -15,7 +19,7 @@ import kotlin.coroutines.CoroutineContext
internal class WalletNative private constructor(
private val network: MoneroNetwork,
private val storageAdapter: IStorageAdapter,
private val remoteNodeClient: IRemoteNodeClient?,
private val rpcClient: IHttpRpcClient?,
private val scope: CoroutineScope,
private val ioDispatcher: CoroutineDispatcher,
) : IWallet.Stub(), Closeable {
@ -25,7 +29,7 @@ internal class WalletNative private constructor(
suspend fun fullNode(
networkId: Int,
storageAdapter: IStorageAdapter,
remoteNodeClient: IRemoteNodeClient? = null,
rpcClient: IHttpRpcClient? = null,
secretSpendKey: SecretKey? = null,
restorePoint: Long? = null,
coroutineContext: CoroutineContext = Dispatchers.Default + SupervisorJob(),
@ -33,7 +37,7 @@ internal class WalletNative private constructor(
) = WalletNative(
network = MoneroNetwork.fromId(networkId),
storageAdapter = storageAdapter,
remoteNodeClient = remoteNodeClient,
rpcClient = rpcClient,
scope = CoroutineScope(coroutineContext),
ioDispatcher = ioDispatcher,
).apply {
@ -381,8 +385,8 @@ internal class WalletNative private constructor(
*/
@CalledByNative
private fun callRemoteNode(
method: String?,
path: String?,
method: String,
path: String,
header: String?,
body: ByteArray?,
): HttpResponse? = runBlocking {
@ -390,8 +394,9 @@ internal class WalletNative private constructor(
if (!requestsAllowed) {
return@runBlocking null
}
val httpRequest = HttpRequest(method, path, header, body)
pendingRequest = async {
remoteNodeClient?.request(HttpRequest(method, path, header, body))
rpcClient?.newCall(httpRequest)
}
}
try {
@ -409,6 +414,33 @@ internal class WalletNative private constructor(
}
}
private val callCounter = AtomicInteger()
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun IHttpRpcClient.newCall(request: HttpRequest): HttpResponse? =
suspendCancellableCoroutine { continuation ->
val callback = object : IHttpRequestCallback.Stub() {
override fun onResponse(response: HttpResponse) {
continuation.resume(response) {
response.close()
}
}
override fun onError() {
continuation.resume(null) {}
}
override fun onRequestCanceled() {
continuation.resume(null) {}
}
}
val callId = callCounter.incrementAndGet()
callAsync(request, callback, callId)
continuation.invokeOnCancellation {
cancelAsync(callId)
}
}
override fun close() {
scope.cancel()
}

View File

@ -51,13 +51,13 @@ class WalletProvider private constructor(
suspend fun createNewWallet(
network: MoneroNetwork,
dataStore: WalletDataStore? = null,
client: RemoteNodeClient? = null,
client: MoneroNodeClient? = null,
): MoneroWallet {
require(client == null || client.network == network)
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.createWallet(
buildConfig(network), storageAdapter, client,
buildConfig(network), storageAdapter, client?.httpRpcClient,
WalletResultCallback(continuation),
)
}
@ -67,7 +67,7 @@ class WalletProvider private constructor(
suspend fun restoreWallet(
network: MoneroNetwork,
dataStore: WalletDataStore? = null,
client: RemoteNodeClient? = null,
client: MoneroNodeClient? = null,
secretSpendKey: SecretKey,
restorePoint: RestorePoint,
): MoneroWallet {
@ -78,7 +78,7 @@ class WalletProvider private constructor(
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.restoreWallet(
buildConfig(network), storageAdapter, client,
buildConfig(network), storageAdapter, client?.httpRpcClient,
WalletResultCallback(continuation),
secretSpendKey,
restorePoint.toLong(),
@ -90,13 +90,13 @@ class WalletProvider private constructor(
suspend fun openWallet(
network: MoneroNetwork,
dataStore: WalletDataStore,
client: RemoteNodeClient? = null,
client: MoneroNodeClient? = null,
): MoneroWallet {
require(client == null || client.network == network)
val storageAdapter = StorageAdapter(dataStore)
val wallet = suspendCancellableCoroutine { continuation ->
service.openWallet(
buildConfig(network), storageAdapter, client,
buildConfig(network), storageAdapter, client?.httpRpcClient,
WalletResultCallback(continuation),
)
}

View File

@ -4,6 +4,7 @@ import android.content.Intent
import android.os.IBinder
import androidx.lifecycle.LifecycleService
import androidx.lifecycle.lifecycleScope
import im.molly.monero.internal.IHttpRpcClient
import kotlinx.coroutines.*
class WalletService : LifecycleService() {
@ -43,13 +44,13 @@ internal class WalletServiceImpl(
override fun createWallet(
config: WalletConfig,
storage: IStorageAdapter,
client: IRemoteNodeClient?,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
) {
serviceScope.launch {
val secretSpendKey = randomSecretKey()
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, storage, client, secret)
createOrRestoreWallet(config, storage, rpcClient, secret)
}
callback?.onWalletResult(wallet)
}
@ -58,14 +59,14 @@ internal class WalletServiceImpl(
override fun restoreWallet(
config: WalletConfig,
storage: IStorageAdapter,
client: IRemoteNodeClient?,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
secretSpendKey: SecretKey,
restorePoint: Long,
) {
serviceScope.launch {
val wallet = secretSpendKey.use { secret ->
createOrRestoreWallet(config, storage, client, secret, restorePoint)
createOrRestoreWallet(config, storage, rpcClient, secret, restorePoint)
}
callback?.onWalletResult(wallet)
}
@ -74,14 +75,14 @@ internal class WalletServiceImpl(
override fun openWallet(
config: WalletConfig,
storage: IStorageAdapter,
client: IRemoteNodeClient?,
rpcClient: IHttpRpcClient?,
callback: IWalletServiceCallbacks?,
) {
serviceScope.launch {
val wallet = WalletNative.fullNode(
networkId = config.networkId,
storageAdapter = storage,
remoteNodeClient = client,
rpcClient = rpcClient,
coroutineContext = serviceScope.coroutineContext,
)
callback?.onWalletResult(wallet)
@ -91,14 +92,14 @@ internal class WalletServiceImpl(
private suspend fun createOrRestoreWallet(
config: WalletConfig,
storage: IStorageAdapter,
client: IRemoteNodeClient?,
rpcClient: IHttpRpcClient?,
secretSpendKey: SecretKey,
restorePoint: Long? = null,
): IWallet {
return WalletNative.fullNode(
networkId = config.networkId,
storageAdapter = storage,
remoteNodeClient = client,
rpcClient = rpcClient,
secretSpendKey = secretSpendKey,
restorePoint = restorePoint,
coroutineContext = serviceScope.coroutineContext,

View File

@ -1,11 +1,19 @@
package im.molly.monero
package im.molly.monero.internal
import android.os.Parcelable
import kotlinx.parcelize.Parcelize
@Parcelize
data class HttpRequest(
val method: String?,
val path: String?,
val method: String,
val path: String,
val header: String?,
val bodyBytes: ByteArray?,
) {
) : Parcelable {
override fun toString(): String =
"HttpRequest(method=$method, path$path, headers=${header?.length}, body=${bodyBytes?.size})"
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
@ -24,8 +32,8 @@ data class HttpRequest(
}
override fun hashCode(): Int {
var result = method?.hashCode() ?: 0
result = 31 * result + (path?.hashCode() ?: 0)
var result = method.hashCode()
result = 31 * result + path.hashCode()
result = 31 * result + (header?.hashCode() ?: 0)
result = 31 * result + (bodyBytes?.contentHashCode() ?: 0)
return result

View File

@ -1,12 +1,15 @@
package im.molly.monero
package im.molly.monero.internal
import android.os.ParcelFileDescriptor
import android.os.Parcelable
import kotlinx.parcelize.Parcelize
@Parcelize
data class HttpResponse(
val code: Int,
val contentType: String? = null,
val body: ParcelFileDescriptor? = null,
) : AutoCloseable {
) : AutoCloseable, Parcelable {
override fun close() {
body?.close()
}

View File

@ -0,0 +1,186 @@
package im.molly.monero.internal
import android.net.Uri
import android.os.ParcelFileDescriptor
import im.molly.monero.BackoffPolicy
import im.molly.monero.loadbalancer.LoadBalancer
import im.molly.monero.loadbalancer.Rule
import im.molly.monero.loggerFor
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers
import okhttp3.MediaType
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response
import java.io.FileOutputStream
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
class RpcClient internal constructor(
private val loadBalancer: LoadBalancer,
private val loadBalancerRule: Rule,
private val retryBackoff: BackoffPolicy,
private val requestsScope: CoroutineScope,
var httpClient: OkHttpClient,
) : IHttpRpcClient.Stub() {
private val logger = loggerFor<RpcClient>()
private val activeRequests = ConcurrentHashMap<Int, Job>()
override fun callAsync(request: HttpRequest, callback: IHttpRequestCallback, callId: Int) {
logger.d("[$callId] Dispatching $request")
val requestJob = requestsScope.launch {
runCatching {
requestWithRetry(request, callId)
}.onSuccess { response ->
val status = response.code
val responseBody = response.body
if (responseBody == null) {
callback.onResponse(
HttpResponse(code = status, contentType = null, body = null)
)
} else {
responseBody.use { body ->
val contentType = body.contentType()?.toString()
val pipe = ParcelFileDescriptor.createPipe()
pipe[0].use { readSize ->
pipe[1].use { writeSide ->
val httpResponse = HttpResponse(
code = status,
contentType = contentType,
body = readSize,
)
callback.onResponse(httpResponse)
FileOutputStream(writeSide.fileDescriptor).use { out ->
runCatching { body.byteStream().copyTo(out) }
}
}
}
}
}
// TODO: Log response times
}.onFailure { throwable ->
when (throwable) {
is CancellationException -> callback.onRequestCanceled()
else -> {
logger.e("[$callId] Failed to dispatch $request", throwable)
callback.onError()
}
}
}
}.also { job ->
val oldJob = activeRequests.put(callId, job)
check(oldJob == null)
}
requestJob.invokeOnCompletion {
activeRequests.remove(callId)
}
}
override fun cancelAsync(requestId: Int) {
activeRequests[requestId]?.cancel()
}
private suspend fun requestWithRetry(request: HttpRequest, callId: Int): Response {
val headers = parseHttpHeader(request.header)
val contentType = headers["Content-Type"]?.toMediaType()
// TODO: Log unsupported headers
val requestBuilder = createRequestBuilder(request, contentType)
val attempts = mutableMapOf<Uri, Int>()
while (true) {
val selected = loadBalancerRule.chooseNode(loadBalancer)
if (selected == null) {
val errorMsg = "No remote node available"
logger.i("[$callId] $errorMsg")
return Response.Builder()
.request(requestBuilder.build())
.protocol(Protocol.HTTP_1_1)
.code(499)
.message(errorMsg)
.build()
}
val uri = selected.uriForPath(request.path)
val retryCount = attempts[uri] ?: 0
delay(retryBackoff.waitTime(retryCount))
logger.d("[$callId] HTTP: ${request.method} $uri")
try {
val response =
httpClient.newCall(requestBuilder.url(uri.toString()).build()).await()
// TODO: Notify loadBalancer
if (response.isSuccessful) {
return response
}
} catch (e: IOException) {
logger.w("[$callId] HTTP: Request failed with ${e::class.simpleName}: ${e.message}")
// TODO: Notify loadBalancer
} finally {
attempts[uri] = retryCount + 1
}
}
}
private fun parseHttpHeader(header: String?): Headers {
return with(Headers.Builder()) {
header?.splitToSequence("\r\n")
?.filter { line -> line.isNotEmpty() }
?.forEach { line -> add(line) }
build()
}
}
private fun createRequestBuilder(
request: HttpRequest,
contentType: MediaType?,
): Request.Builder {
return with(Request.Builder()) {
when {
request.method.equals("GET", ignoreCase = true) -> {}
request.method.equals("POST", ignoreCase = true) -> {
val content = request.bodyBytes ?: ByteArray(0)
post(content.toRequestBody(contentType))
}
else -> throw IllegalArgumentException("Unsupported method")
}
url("http:${request.path}")
// TODO: Add authentication
}
}
private suspend fun Call.await() = suspendCoroutine { continuation ->
enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
continuation.resume(response)
}
override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}
})
}
// private val Response.roundTripMillis: Long
// get() = sentRequestAtMillis() - receivedResponseAtMillis()
}