lib: multiple fixes of async functions

This commit is contained in:
Oscar Mira 2023-04-27 16:13:14 +02:00
parent 5cf5c71b90
commit 698ae32ef2
3 changed files with 47 additions and 19 deletions

View File

@ -9,7 +9,7 @@ interface IWallet {
void removeBalanceListener(in IBalanceListener listener);
oneway void save(in ParcelFileDescriptor destination);
oneway void resumeRefresh(boolean skipCoinbaseOutputs, in IRefreshCallback callback);
void cancelRefresh();
void setRefreshSince(long heightOrTimestamp);
oneway void cancelRefresh();
oneway void setRefreshSince(long heightOrTimestamp);
void close();
}

View File

@ -2,6 +2,7 @@ package im.molly.monero
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
@ -12,6 +13,8 @@ class MoneroWallet internal constructor(
val remoteNodeClient: RemoteNodeClient?,
) : IWallet by wallet, AutoCloseable {
private val logger = loggerFor<MoneroWallet>()
val publicAddress: String = wallet.primaryAccountAddress
/**
@ -23,11 +26,18 @@ class MoneroWallet internal constructor(
override fun onBalanceChanged(txOuts: List<OwnedTxOut>?, checkedAtBlockHeight: Long) {
lastKnownLedger = Ledger(publicAddress, txOuts!!, checkedAtBlockHeight)
trySendBlocking(lastKnownLedger)
sendLedger(lastKnownLedger)
}
override fun onRefresh(blockchainHeight: Long) {
trySendBlocking(lastKnownLedger.copy(checkedAtBlockHeight = blockchainHeight))
sendLedger(lastKnownLedger.copy(checkedAtBlockHeight = blockchainHeight))
}
private fun sendLedger(ledger: Ledger) {
trySend(ledger)
.onFailure {
logger.e("Too many ledger updates, channel capacity exceeded")
}
}
}

View File

@ -93,10 +93,16 @@ class WalletNative private constructor(
}
}
override fun cancelRefresh() = nativeCancelRefresh(handle)
override fun cancelRefresh() {
scope.launch(ioDispatcher) {
nativeCancelRefresh(handle)
}
}
override fun setRefreshSince(blockHeightOrTimestamp: Long) {
nativeSetRefreshSince(handle, blockHeightOrTimestamp)
scope.launch(ioDispatcher) {
nativeSetRefreshSince(handle, blockHeightOrTimestamp)
}
}
/**
@ -158,7 +164,12 @@ class WalletNative private constructor(
private val pendingRequestLock = ReentrantLock()
@CalledByNative("wallet.cc")
/**
* Invoked by native code to make a cancellable remote call to a remote node.
*
* Caller must close [HttpResponse.body] upon completion of processing the response.
*/
@CalledByNative("http_client.cc")
private fun callRemoteNode(
method: String?,
path: String?,
@ -166,19 +177,26 @@ class WalletNative private constructor(
body: ByteArray?,
): HttpResponse? = runBlocking {
pendingRequestLock.withLock {
pendingRequest = if (requestsAllowed) {
async {
remoteNodeClient?.request(HttpRequest(method, path, header, body))
}
} else null
}
runCatching {
pendingRequest?.await()
}.onFailure { throwable ->
if (throwable !is CancellationException) {
throw throwable
if (!requestsAllowed) {
return@runBlocking null
}
}.getOrNull()
pendingRequest = async {
remoteNodeClient?.request(HttpRequest(method, path, header, body))
}
}
try {
runCatching {
pendingRequest?.await()
}.onFailure { throwable ->
if (throwable is CancellationException) {
return@onFailure
}
logger.e("Error waiting for HTTP response", throwable)
throw throwable
}.getOrNull()
} finally {
pendingRequest = null
}
}
override fun close() {