diff --git a/demo/android/src/main/kotlin/im/molly/monero/demo/data/MoneroSdkClient.kt b/demo/android/src/main/kotlin/im/molly/monero/demo/data/MoneroSdkClient.kt index 064f5aa..84f2e75 100644 --- a/demo/android/src/main/kotlin/im/molly/monero/demo/data/MoneroSdkClient.kt +++ b/demo/android/src/main/kotlin/im/molly/monero/demo/data/MoneroSdkClient.kt @@ -52,7 +52,7 @@ class MoneroSdkClient(private val context: Context) { httpClient: OkHttpClient, ): MoneroWallet { val dataStore = WalletDataStoreFile(filename) - val client = RemoteNodeClient.forNetwork( + val client = MoneroNodeClient.forNetwork( network = network, remoteNodes = remoteNodes, loadBalancerRule = RoundRobinRule(), diff --git a/demo/android/src/main/kotlin/im/molly/monero/demo/data/WalletRepository.kt b/demo/android/src/main/kotlin/im/molly/monero/demo/data/WalletRepository.kt index 28f4bc3..ee23599 100644 --- a/demo/android/src/main/kotlin/im/molly/monero/demo/data/WalletRepository.kt +++ b/demo/android/src/main/kotlin/im/molly/monero/demo/data/WalletRepository.kt @@ -51,8 +51,8 @@ class WalletRepository( fun getWalletIdList() = walletDataSource.readWalletIdList() - fun getRemoteClients(): Flow> = - getWalletIdList().map { it.mapNotNull { walletId -> getWallet(walletId).remoteNodeClient } } + fun getMoneroNodeClients(): Flow> = + getWalletIdList().map { it.mapNotNull { walletId -> getWallet(walletId).moneroNodeClient } } fun getWalletConfig(walletId: Long) = walletDataSource.readWalletConfig(walletId) diff --git a/demo/android/src/main/kotlin/im/molly/monero/demo/ui/SettingsViewModel.kt b/demo/android/src/main/kotlin/im/molly/monero/demo/ui/SettingsViewModel.kt index 0a17ec6..b14a54f 100644 --- a/demo/android/src/main/kotlin/im/molly/monero/demo/ui/SettingsViewModel.kt +++ b/demo/android/src/main/kotlin/im/molly/monero/demo/ui/SettingsViewModel.kt @@ -82,7 +82,7 @@ class SettingsViewModel( } private suspend fun onProxyChanged(newProxy: Proxy) { - walletRepository.getRemoteClients().first().forEach { client -> + walletRepository.getMoneroNodeClients().first().forEach { client -> val current = client.httpClient.proxy if (current != newProxy) { val builder = client.httpClient.newBuilder() diff --git a/lib/android/src/main/aidl/im/molly/monero/HttpResponse.aidl b/lib/android/src/main/aidl/im/molly/monero/HttpResponse.aidl deleted file mode 100644 index 7de978e..0000000 --- a/lib/android/src/main/aidl/im/molly/monero/HttpResponse.aidl +++ /dev/null @@ -1,3 +0,0 @@ -package im.molly.monero; - -parcelable HttpResponse; diff --git a/lib/android/src/main/aidl/im/molly/monero/IHttpRequestCallback.aidl b/lib/android/src/main/aidl/im/molly/monero/IHttpRequestCallback.aidl deleted file mode 100644 index a1a4246..0000000 --- a/lib/android/src/main/aidl/im/molly/monero/IHttpRequestCallback.aidl +++ /dev/null @@ -1,6 +0,0 @@ -package im.molly.monero; - -oneway interface IHttpRequestCallback { - void onResponse(int code, String contentType, in ParcelFileDescriptor body); - void onFailure(); -} diff --git a/lib/android/src/main/aidl/im/molly/monero/IRemoteNodeClient.aidl b/lib/android/src/main/aidl/im/molly/monero/IRemoteNodeClient.aidl deleted file mode 100644 index 7c15dec..0000000 --- a/lib/android/src/main/aidl/im/molly/monero/IRemoteNodeClient.aidl +++ /dev/null @@ -1,8 +0,0 @@ -package im.molly.monero; - -import im.molly.monero.IHttpRequestCallback; - -interface IRemoteNodeClient { - oneway void requestAsync(int requestId, String method, String path, String header, in byte[] bodyBytes, in IHttpRequestCallback callback); - oneway void cancelAsync(int requestId); -} diff --git a/lib/android/src/main/aidl/im/molly/monero/IWalletClient.aidl b/lib/android/src/main/aidl/im/molly/monero/IWalletClient.aidl deleted file mode 100644 index 6888da9..0000000 --- a/lib/android/src/main/aidl/im/molly/monero/IWalletClient.aidl +++ /dev/null @@ -1,8 +0,0 @@ -package im.molly.monero; - -import im.molly.monero.IRemoteNodeClient; - -interface IWalletClient { - int getNetworkId(); - IRemoteNodeClient getRemoteNodeClient(); -} diff --git a/lib/android/src/main/aidl/im/molly/monero/IWalletService.aidl b/lib/android/src/main/aidl/im/molly/monero/IWalletService.aidl index c6e579c..7ceb57e 100644 --- a/lib/android/src/main/aidl/im/molly/monero/IWalletService.aidl +++ b/lib/android/src/main/aidl/im/molly/monero/IWalletService.aidl @@ -1,15 +1,15 @@ package im.molly.monero; -import im.molly.monero.IRemoteNodeClient; import im.molly.monero.IStorageAdapter; import im.molly.monero.IWalletServiceCallbacks; import im.molly.monero.IWalletServiceListener; import im.molly.monero.SecretKey; import im.molly.monero.WalletConfig; +import im.molly.monero.internal.IHttpRpcClient; interface IWalletService { - oneway void createWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback); - oneway void restoreWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint); - oneway void openWallet(in WalletConfig config, in IStorageAdapter storage, in IRemoteNodeClient client, in IWalletServiceCallbacks callback); + oneway void createWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback); + oneway void restoreWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback, in SecretKey spendSecretKey, long restorePoint); + oneway void openWallet(in WalletConfig config, in IStorageAdapter storage, in IHttpRpcClient rpcClient, in IWalletServiceCallbacks callback); void setListener(in IWalletServiceListener listener); } diff --git a/lib/android/src/main/aidl/im/molly/monero/internal/HttpRequest.aidl b/lib/android/src/main/aidl/im/molly/monero/internal/HttpRequest.aidl new file mode 100644 index 0000000..1b0c9e2 --- /dev/null +++ b/lib/android/src/main/aidl/im/molly/monero/internal/HttpRequest.aidl @@ -0,0 +1,3 @@ +package im.molly.monero.internal; + +parcelable HttpRequest; diff --git a/lib/android/src/main/aidl/im/molly/monero/internal/HttpResponse.aidl b/lib/android/src/main/aidl/im/molly/monero/internal/HttpResponse.aidl new file mode 100644 index 0000000..a228133 --- /dev/null +++ b/lib/android/src/main/aidl/im/molly/monero/internal/HttpResponse.aidl @@ -0,0 +1,3 @@ +package im.molly.monero.internal; + +parcelable HttpResponse; diff --git a/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRequestCallback.aidl b/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRequestCallback.aidl new file mode 100644 index 0000000..d3f0a98 --- /dev/null +++ b/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRequestCallback.aidl @@ -0,0 +1,9 @@ +package im.molly.monero.internal; + +import im.molly.monero.internal.HttpResponse; + +oneway interface IHttpRequestCallback { + void onResponse(in HttpResponse response); + void onError(); + void onRequestCanceled(); +} diff --git a/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRpcClient.aidl b/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRpcClient.aidl new file mode 100644 index 0000000..ff2d696 --- /dev/null +++ b/lib/android/src/main/aidl/im/molly/monero/internal/IHttpRpcClient.aidl @@ -0,0 +1,9 @@ +package im.molly.monero.internal; + +import im.molly.monero.internal.HttpRequest; +import im.molly.monero.internal.IHttpRequestCallback; + +interface IHttpRpcClient { + oneway void callAsync(in HttpRequest request, in IHttpRequestCallback callback, int callId); + oneway void cancelAsync(int callId); +} diff --git a/lib/android/src/main/cpp/wallet/jni_cache.cc b/lib/android/src/main/cpp/wallet/jni_cache.cc index e211de0..98b7a62 100644 --- a/lib/android/src/main/cpp/wallet/jni_cache.cc +++ b/lib/android/src/main/cpp/wallet/jni_cache.cc @@ -24,7 +24,7 @@ jmethodID ParcelFd_detachFd; ScopedJavaGlobalRef StringClass; void InitializeJniCache(JNIEnv* env) { - jclass httpResponse = GetClass(env, "im/molly/monero/HttpResponse"); + jclass httpResponse = GetClass(env, "im/molly/monero/internal/HttpResponse"); jclass iTransferCallback = GetClass(env, "im/molly/monero/ITransferCallback"); jclass logger = GetClass(env, "im/molly/monero/Logger"); jclass txInfo = GetClass(env, "im/molly/monero/internal/TxInfo"); @@ -63,7 +63,7 @@ void InitializeJniCache(JNIEnv* env) { WalletNative_callRemoteNode = GetMethodId( env, walletNative, "callRemoteNode", - "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[B)Lim/molly/monero/HttpResponse;"); + "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[B)Lim/molly/monero/internal/HttpResponse;"); WalletNative_onRefresh = GetMethodId( env, walletNative, "onRefresh", "(IJZ)V"); diff --git a/lib/android/src/main/kotlin/im/molly/monero/MoneroNodeClient.kt b/lib/android/src/main/kotlin/im/molly/monero/MoneroNodeClient.kt new file mode 100644 index 0000000..f5b4a16 --- /dev/null +++ b/lib/android/src/main/kotlin/im/molly/monero/MoneroNodeClient.kt @@ -0,0 +1,58 @@ +package im.molly.monero + +import im.molly.monero.internal.IHttpRpcClient +import im.molly.monero.internal.RpcClient +import im.molly.monero.loadbalancer.LoadBalancer +import im.molly.monero.loadbalancer.Rule +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import okhttp3.OkHttpClient + +class MoneroNodeClient private constructor( + val network: MoneroNetwork, + private val rpcClient: RpcClient, + private val scope: CoroutineScope, +) : AutoCloseable { + + companion object { + /** + * Constructs a [MoneroNodeClient] to connect to the Monero [network]. + */ + fun forNetwork( + network: MoneroNetwork, + remoteNodes: Flow>, + loadBalancerRule: Rule, + httpClient: OkHttpClient, + retryBackoff: BackoffPolicy = ExponentialBackoff.Default, + ioDispatcher: CoroutineDispatcher = Dispatchers.IO, + ): MoneroNodeClient { + val scope = CoroutineScope(ioDispatcher + SupervisorJob()) + val loadBalancer = LoadBalancer(remoteNodes, scope) + val rpcClient = RpcClient( + loadBalancer = loadBalancer, + loadBalancerRule = loadBalancerRule, + retryBackoff = retryBackoff, + requestsScope = scope, + httpClient = httpClient, + ) + return MoneroNodeClient(network, rpcClient, scope) + } + } + + var httpClient: OkHttpClient + get() = rpcClient.httpClient + set(value) { + rpcClient.httpClient = value + } + + internal val httpRpcClient: IHttpRpcClient + get() = rpcClient + + override fun close() { + scope.cancel("MoneroNodeClient is closing: Cancelling all ongoing requests") + } +} diff --git a/lib/android/src/main/kotlin/im/molly/monero/MoneroWallet.kt b/lib/android/src/main/kotlin/im/molly/monero/MoneroWallet.kt index 6d01698..adf308f 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/MoneroWallet.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/MoneroWallet.kt @@ -19,7 +19,7 @@ import kotlin.time.Duration.Companion.seconds class MoneroWallet internal constructor( private val wallet: IWallet, private val storageAdapter: StorageAdapter, - val remoteNodeClient: RemoteNodeClient?, + val moneroNodeClient: MoneroNodeClient?, ) : AutoCloseable { private val logger = loggerFor() diff --git a/lib/android/src/main/kotlin/im/molly/monero/RemoteNodeClient.kt b/lib/android/src/main/kotlin/im/molly/monero/RemoteNodeClient.kt deleted file mode 100644 index 206e314..0000000 --- a/lib/android/src/main/kotlin/im/molly/monero/RemoteNodeClient.kt +++ /dev/null @@ -1,224 +0,0 @@ -package im.molly.monero - -import android.net.Uri -import android.os.ParcelFileDescriptor -import im.molly.monero.loadbalancer.LoadBalancer -import im.molly.monero.loadbalancer.Rule -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import okhttp3.* -import okhttp3.MediaType.Companion.toMediaType -import okhttp3.RequestBody.Companion.toRequestBody -import java.io.FileOutputStream -import java.io.IOException -import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine - -// TODO: Hide IRemoteNodeClient methods and rename to HttpRpcClient or MoneroNodeClient -class RemoteNodeClient private constructor( - val network: MoneroNetwork, - private val loadBalancer: LoadBalancer, - private val loadBalancerRule: Rule, - var httpClient: OkHttpClient, - private val retryBackoff: BackoffPolicy, - private val requestsScope: CoroutineScope, -) : IRemoteNodeClient.Stub(), AutoCloseable { - - companion object { - /** - * Constructs a [RemoteNodeClient] to connect to the Monero [network]. - */ - fun forNetwork( - network: MoneroNetwork, - remoteNodes: Flow>, - loadBalancerRule: Rule, - httpClient: OkHttpClient, - retryBackoff: BackoffPolicy = ExponentialBackoff.Default, - ioDispatcher: CoroutineDispatcher = Dispatchers.IO, - ): RemoteNodeClient { - val scope = CoroutineScope(ioDispatcher + SupervisorJob()) - return RemoteNodeClient( - network, - LoadBalancer(remoteNodes, scope), - loadBalancerRule, - httpClient, - retryBackoff, - scope - ) - } - } - - private val logger = loggerFor() - - private val requestList = ConcurrentHashMap() - - override fun requestAsync( - requestId: Int, - method: String, - path: String, - header: String?, - body: ByteArray?, - callback: IHttpRequestCallback?, - ) { - logger.d("HTTP: $method $path, header_len=${header?.length}, body_size=${body?.size}") - - val requestJob = requestsScope.launch { - runCatching { - requestWithRetry(method, path, header, body) - }.onSuccess { response -> - val status = response.code - val responseBody = response.body - if (responseBody == null) { - callback?.onResponse(status, null, null) - } else { - responseBody.use { body -> - val contentType = body.contentType()?.toString() - val pipe = ParcelFileDescriptor.createPipe() - pipe[1].use { writeSide -> - callback?.onResponse(status, contentType, pipe[0]) - FileOutputStream(writeSide.fileDescriptor).use { out -> - runCatching { body.byteStream().copyTo(out) } - } - } - } - } - // TODO: Log response times - }.onFailure { throwable -> - logger.e("HTTP: Request failed", throwable) - callback?.onFailure() - } - }.also { - requestList[requestId] = it - } - - requestJob.invokeOnCompletion { - requestList.remove(requestId) - } - } - - override fun cancelAsync(requestId: Int) { - requestList[requestId]?.cancel() - } - - override fun close() { - requestsScope.cancel() - } - - private suspend fun requestWithRetry( - method: String, - path: String, - header: String?, - body: ByteArray?, - ): Response { - val headers = parseHttpHeader(header) - val contentType = headers["Content-Type"]?.toMediaType() - // TODO: Log unsupported headers - val requestBuilder = with(Request.Builder()) { - when { - method.equals("GET", ignoreCase = true) -> {} - method.equals("POST", ignoreCase = true) -> { - val content = body ?: ByteArray(0) - post(content.toRequestBody(contentType)) - } - - else -> throw IllegalArgumentException("Unsupported method") - } - url("http:$path") - // TODO: Add authentication - } - - val attempts = mutableMapOf() - - while (true) { - val selected = loadBalancerRule.chooseNode(loadBalancer) - if (selected == null) { - logger.i("No remote node available") - - return Response.Builder() - .request(requestBuilder.build()) - .protocol(Protocol.HTTP_1_1) - .code(499) - .message("No remote node available") - .build() - } - - val uri = selected.uriForPath(path) - val retryCount = attempts[uri] ?: 0 - - delay(retryBackoff.waitTime(retryCount)) - - logger.d("HTTP: $method $uri") - - val response = try { - - val request = requestBuilder.url(uri.toString()).build() - - httpClient.newCall(request).await() - } catch (e: IOException) { - logger.e("HTTP: Request failed", e) - // TODO: Notify loadBalancer - continue - } finally { - attempts[uri] = retryCount + 1 - } - - if (response.isSuccessful) { - // TODO: Notify loadBalancer - return response - } - } - } - - private fun parseHttpHeader(header: String?): Headers = - with(Headers.Builder()) { - header?.splitToSequence("\r\n") - ?.filter { line -> line.isNotEmpty() } - ?.forEach { line -> add(line) } - build() - } - - private suspend fun Call.await() = suspendCoroutine { continuation -> - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - continuation.resume(response) - } - - override fun onFailure(call: Call, e: IOException) { - continuation.resumeWithException(e) - } - }) - } - -// private val Response.roundTripMillis: Long -// get() = sentRequestAtMillis() - receivedResponseAtMillis() - -} - -@OptIn(ExperimentalCoroutinesApi::class) -internal suspend fun IRemoteNodeClient.request(request: HttpRequest): HttpResponse? = - suspendCancellableCoroutine { continuation -> - val requestId = request.hashCode() - val callback = object : IHttpRequestCallback.Stub() { - override fun onResponse( - code: Int, - contentType: String?, - body: ParcelFileDescriptor?, - ) { - continuation.resume(HttpResponse(code, contentType, body)) { - body?.close() - } - } - - override fun onFailure() { - continuation.resume(null) {} - } - } - with(request) { - requestAsync(requestId, method, path, header, bodyBytes, callback) - } - continuation.invokeOnCancellation { - cancelAsync(requestId) - } - } diff --git a/lib/android/src/main/kotlin/im/molly/monero/WalletNative.kt b/lib/android/src/main/kotlin/im/molly/monero/WalletNative.kt index ff58701..6844a1f 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/WalletNative.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/WalletNative.kt @@ -2,12 +2,16 @@ package im.molly.monero import android.os.ParcelFileDescriptor import androidx.annotation.GuardedBy +import im.molly.monero.internal.HttpRequest +import im.molly.monero.internal.HttpResponse +import im.molly.monero.internal.IHttpRequestCallback +import im.molly.monero.internal.IHttpRpcClient import im.molly.monero.internal.TxInfo import kotlinx.coroutines.* import java.io.Closeable import java.time.Instant -import java.util.* import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import kotlin.coroutines.CoroutineContext @@ -15,7 +19,7 @@ import kotlin.coroutines.CoroutineContext internal class WalletNative private constructor( private val network: MoneroNetwork, private val storageAdapter: IStorageAdapter, - private val remoteNodeClient: IRemoteNodeClient?, + private val rpcClient: IHttpRpcClient?, private val scope: CoroutineScope, private val ioDispatcher: CoroutineDispatcher, ) : IWallet.Stub(), Closeable { @@ -25,7 +29,7 @@ internal class WalletNative private constructor( suspend fun fullNode( networkId: Int, storageAdapter: IStorageAdapter, - remoteNodeClient: IRemoteNodeClient? = null, + rpcClient: IHttpRpcClient? = null, secretSpendKey: SecretKey? = null, restorePoint: Long? = null, coroutineContext: CoroutineContext = Dispatchers.Default + SupervisorJob(), @@ -33,7 +37,7 @@ internal class WalletNative private constructor( ) = WalletNative( network = MoneroNetwork.fromId(networkId), storageAdapter = storageAdapter, - remoteNodeClient = remoteNodeClient, + rpcClient = rpcClient, scope = CoroutineScope(coroutineContext), ioDispatcher = ioDispatcher, ).apply { @@ -381,8 +385,8 @@ internal class WalletNative private constructor( */ @CalledByNative private fun callRemoteNode( - method: String?, - path: String?, + method: String, + path: String, header: String?, body: ByteArray?, ): HttpResponse? = runBlocking { @@ -390,8 +394,9 @@ internal class WalletNative private constructor( if (!requestsAllowed) { return@runBlocking null } + val httpRequest = HttpRequest(method, path, header, body) pendingRequest = async { - remoteNodeClient?.request(HttpRequest(method, path, header, body)) + rpcClient?.newCall(httpRequest) } } try { @@ -409,6 +414,33 @@ internal class WalletNative private constructor( } } + private val callCounter = AtomicInteger() + + @OptIn(ExperimentalCoroutinesApi::class) + private suspend fun IHttpRpcClient.newCall(request: HttpRequest): HttpResponse? = + suspendCancellableCoroutine { continuation -> + val callback = object : IHttpRequestCallback.Stub() { + override fun onResponse(response: HttpResponse) { + continuation.resume(response) { + response.close() + } + } + + override fun onError() { + continuation.resume(null) {} + } + + override fun onRequestCanceled() { + continuation.resume(null) {} + } + } + val callId = callCounter.incrementAndGet() + callAsync(request, callback, callId) + continuation.invokeOnCancellation { + cancelAsync(callId) + } + } + override fun close() { scope.cancel() } diff --git a/lib/android/src/main/kotlin/im/molly/monero/WalletProvider.kt b/lib/android/src/main/kotlin/im/molly/monero/WalletProvider.kt index ad5a9dd..5737f4c 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/WalletProvider.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/WalletProvider.kt @@ -51,13 +51,13 @@ class WalletProvider private constructor( suspend fun createNewWallet( network: MoneroNetwork, dataStore: WalletDataStore? = null, - client: RemoteNodeClient? = null, + client: MoneroNodeClient? = null, ): MoneroWallet { require(client == null || client.network == network) val storageAdapter = StorageAdapter(dataStore) val wallet = suspendCancellableCoroutine { continuation -> service.createWallet( - buildConfig(network), storageAdapter, client, + buildConfig(network), storageAdapter, client?.httpRpcClient, WalletResultCallback(continuation), ) } @@ -67,7 +67,7 @@ class WalletProvider private constructor( suspend fun restoreWallet( network: MoneroNetwork, dataStore: WalletDataStore? = null, - client: RemoteNodeClient? = null, + client: MoneroNodeClient? = null, secretSpendKey: SecretKey, restorePoint: RestorePoint, ): MoneroWallet { @@ -78,7 +78,7 @@ class WalletProvider private constructor( val storageAdapter = StorageAdapter(dataStore) val wallet = suspendCancellableCoroutine { continuation -> service.restoreWallet( - buildConfig(network), storageAdapter, client, + buildConfig(network), storageAdapter, client?.httpRpcClient, WalletResultCallback(continuation), secretSpendKey, restorePoint.toLong(), @@ -90,13 +90,13 @@ class WalletProvider private constructor( suspend fun openWallet( network: MoneroNetwork, dataStore: WalletDataStore, - client: RemoteNodeClient? = null, + client: MoneroNodeClient? = null, ): MoneroWallet { require(client == null || client.network == network) val storageAdapter = StorageAdapter(dataStore) val wallet = suspendCancellableCoroutine { continuation -> service.openWallet( - buildConfig(network), storageAdapter, client, + buildConfig(network), storageAdapter, client?.httpRpcClient, WalletResultCallback(continuation), ) } diff --git a/lib/android/src/main/kotlin/im/molly/monero/WalletService.kt b/lib/android/src/main/kotlin/im/molly/monero/WalletService.kt index e2026d8..1bdf696 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/WalletService.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/WalletService.kt @@ -4,6 +4,7 @@ import android.content.Intent import android.os.IBinder import androidx.lifecycle.LifecycleService import androidx.lifecycle.lifecycleScope +import im.molly.monero.internal.IHttpRpcClient import kotlinx.coroutines.* class WalletService : LifecycleService() { @@ -43,13 +44,13 @@ internal class WalletServiceImpl( override fun createWallet( config: WalletConfig, storage: IStorageAdapter, - client: IRemoteNodeClient?, + rpcClient: IHttpRpcClient?, callback: IWalletServiceCallbacks?, ) { serviceScope.launch { val secretSpendKey = randomSecretKey() val wallet = secretSpendKey.use { secret -> - createOrRestoreWallet(config, storage, client, secret) + createOrRestoreWallet(config, storage, rpcClient, secret) } callback?.onWalletResult(wallet) } @@ -58,14 +59,14 @@ internal class WalletServiceImpl( override fun restoreWallet( config: WalletConfig, storage: IStorageAdapter, - client: IRemoteNodeClient?, + rpcClient: IHttpRpcClient?, callback: IWalletServiceCallbacks?, secretSpendKey: SecretKey, restorePoint: Long, ) { serviceScope.launch { val wallet = secretSpendKey.use { secret -> - createOrRestoreWallet(config, storage, client, secret, restorePoint) + createOrRestoreWallet(config, storage, rpcClient, secret, restorePoint) } callback?.onWalletResult(wallet) } @@ -74,14 +75,14 @@ internal class WalletServiceImpl( override fun openWallet( config: WalletConfig, storage: IStorageAdapter, - client: IRemoteNodeClient?, + rpcClient: IHttpRpcClient?, callback: IWalletServiceCallbacks?, ) { serviceScope.launch { val wallet = WalletNative.fullNode( networkId = config.networkId, storageAdapter = storage, - remoteNodeClient = client, + rpcClient = rpcClient, coroutineContext = serviceScope.coroutineContext, ) callback?.onWalletResult(wallet) @@ -91,14 +92,14 @@ internal class WalletServiceImpl( private suspend fun createOrRestoreWallet( config: WalletConfig, storage: IStorageAdapter, - client: IRemoteNodeClient?, + rpcClient: IHttpRpcClient?, secretSpendKey: SecretKey, restorePoint: Long? = null, ): IWallet { return WalletNative.fullNode( networkId = config.networkId, storageAdapter = storage, - remoteNodeClient = client, + rpcClient = rpcClient, secretSpendKey = secretSpendKey, restorePoint = restorePoint, coroutineContext = serviceScope.coroutineContext, diff --git a/lib/android/src/main/kotlin/im/molly/monero/HttpRequest.kt b/lib/android/src/main/kotlin/im/molly/monero/internal/HttpRequest.kt similarity index 67% rename from lib/android/src/main/kotlin/im/molly/monero/HttpRequest.kt rename to lib/android/src/main/kotlin/im/molly/monero/internal/HttpRequest.kt index a648819..0c1d81e 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/HttpRequest.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/internal/HttpRequest.kt @@ -1,11 +1,19 @@ -package im.molly.monero +package im.molly.monero.internal +import android.os.Parcelable +import kotlinx.parcelize.Parcelize + +@Parcelize data class HttpRequest( - val method: String?, - val path: String?, + val method: String, + val path: String, val header: String?, val bodyBytes: ByteArray?, -) { +) : Parcelable { + + override fun toString(): String = + "HttpRequest(method=$method, path$path, headers=${header?.length}, body=${bodyBytes?.size})" + override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false @@ -24,8 +32,8 @@ data class HttpRequest( } override fun hashCode(): Int { - var result = method?.hashCode() ?: 0 - result = 31 * result + (path?.hashCode() ?: 0) + var result = method.hashCode() + result = 31 * result + path.hashCode() result = 31 * result + (header?.hashCode() ?: 0) result = 31 * result + (bodyBytes?.contentHashCode() ?: 0) return result diff --git a/lib/android/src/main/kotlin/im/molly/monero/HttpResponse.kt b/lib/android/src/main/kotlin/im/molly/monero/internal/HttpResponse.kt similarity index 61% rename from lib/android/src/main/kotlin/im/molly/monero/HttpResponse.kt rename to lib/android/src/main/kotlin/im/molly/monero/internal/HttpResponse.kt index 658e8ba..8d54eb7 100644 --- a/lib/android/src/main/kotlin/im/molly/monero/HttpResponse.kt +++ b/lib/android/src/main/kotlin/im/molly/monero/internal/HttpResponse.kt @@ -1,12 +1,15 @@ -package im.molly.monero +package im.molly.monero.internal import android.os.ParcelFileDescriptor +import android.os.Parcelable +import kotlinx.parcelize.Parcelize +@Parcelize data class HttpResponse( val code: Int, val contentType: String? = null, val body: ParcelFileDescriptor? = null, -) : AutoCloseable { +) : AutoCloseable, Parcelable { override fun close() { body?.close() } diff --git a/lib/android/src/main/kotlin/im/molly/monero/internal/RpcClient.kt b/lib/android/src/main/kotlin/im/molly/monero/internal/RpcClient.kt new file mode 100644 index 0000000..e4b3ef6 --- /dev/null +++ b/lib/android/src/main/kotlin/im/molly/monero/internal/RpcClient.kt @@ -0,0 +1,186 @@ +package im.molly.monero.internal + +import android.net.Uri +import android.os.ParcelFileDescriptor +import im.molly.monero.BackoffPolicy +import im.molly.monero.loadbalancer.LoadBalancer +import im.molly.monero.loadbalancer.Rule +import im.molly.monero.loggerFor +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import okhttp3.Call +import okhttp3.Callback +import okhttp3.Headers +import okhttp3.MediaType +import okhttp3.MediaType.Companion.toMediaType +import okhttp3.OkHttpClient +import okhttp3.Protocol +import okhttp3.Request +import okhttp3.RequestBody.Companion.toRequestBody +import okhttp3.Response +import java.io.FileOutputStream +import java.io.IOException +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +class RpcClient internal constructor( + private val loadBalancer: LoadBalancer, + private val loadBalancerRule: Rule, + private val retryBackoff: BackoffPolicy, + private val requestsScope: CoroutineScope, + var httpClient: OkHttpClient, +) : IHttpRpcClient.Stub() { + + private val logger = loggerFor() + + private val activeRequests = ConcurrentHashMap() + + override fun callAsync(request: HttpRequest, callback: IHttpRequestCallback, callId: Int) { + logger.d("[$callId] Dispatching $request") + + val requestJob = requestsScope.launch { + runCatching { + requestWithRetry(request, callId) + }.onSuccess { response -> + val status = response.code + val responseBody = response.body + if (responseBody == null) { + callback.onResponse( + HttpResponse(code = status, contentType = null, body = null) + ) + } else { + responseBody.use { body -> + val contentType = body.contentType()?.toString() + val pipe = ParcelFileDescriptor.createPipe() + pipe[0].use { readSize -> + pipe[1].use { writeSide -> + val httpResponse = HttpResponse( + code = status, + contentType = contentType, + body = readSize, + ) + callback.onResponse(httpResponse) + FileOutputStream(writeSide.fileDescriptor).use { out -> + runCatching { body.byteStream().copyTo(out) } + } + } + } + } + } + // TODO: Log response times + }.onFailure { throwable -> + when (throwable) { + is CancellationException -> callback.onRequestCanceled() + else -> { + logger.e("[$callId] Failed to dispatch $request", throwable) + callback.onError() + } + } + } + }.also { job -> + val oldJob = activeRequests.put(callId, job) + check(oldJob == null) + } + + requestJob.invokeOnCompletion { + activeRequests.remove(callId) + } + } + + override fun cancelAsync(requestId: Int) { + activeRequests[requestId]?.cancel() + } + + private suspend fun requestWithRetry(request: HttpRequest, callId: Int): Response { + val headers = parseHttpHeader(request.header) + val contentType = headers["Content-Type"]?.toMediaType() + // TODO: Log unsupported headers + val requestBuilder = createRequestBuilder(request, contentType) + + val attempts = mutableMapOf() + + while (true) { + val selected = loadBalancerRule.chooseNode(loadBalancer) + if (selected == null) { + val errorMsg = "No remote node available" + logger.i("[$callId] $errorMsg") + return Response.Builder() + .request(requestBuilder.build()) + .protocol(Protocol.HTTP_1_1) + .code(499) + .message(errorMsg) + .build() + } + + val uri = selected.uriForPath(request.path) + val retryCount = attempts[uri] ?: 0 + + delay(retryBackoff.waitTime(retryCount)) + + logger.d("[$callId] HTTP: ${request.method} $uri") + + try { + val response = + httpClient.newCall(requestBuilder.url(uri.toString()).build()).await() + // TODO: Notify loadBalancer + if (response.isSuccessful) { + return response + } + } catch (e: IOException) { + logger.w("[$callId] HTTP: Request failed with ${e::class.simpleName}: ${e.message}") + // TODO: Notify loadBalancer + } finally { + attempts[uri] = retryCount + 1 + } + } + } + + private fun parseHttpHeader(header: String?): Headers { + return with(Headers.Builder()) { + header?.splitToSequence("\r\n") + ?.filter { line -> line.isNotEmpty() } + ?.forEach { line -> add(line) } + build() + } + } + + private fun createRequestBuilder( + request: HttpRequest, + contentType: MediaType?, + ): Request.Builder { + return with(Request.Builder()) { + when { + request.method.equals("GET", ignoreCase = true) -> {} + request.method.equals("POST", ignoreCase = true) -> { + val content = request.bodyBytes ?: ByteArray(0) + post(content.toRequestBody(contentType)) + } + + else -> throw IllegalArgumentException("Unsupported method") + } + url("http:${request.path}") + // TODO: Add authentication + } + } + + private suspend fun Call.await() = suspendCoroutine { continuation -> + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + continuation.resume(response) + } + + override fun onFailure(call: Call, e: IOException) { + continuation.resumeWithException(e) + } + }) + } + +// private val Response.roundTripMillis: Long +// get() = sentRequestAtMillis() - receivedResponseAtMillis() + +}