lib: send balance updates in batches

This commit is contained in:
Oscar Mira 2025-02-27 12:23:46 +01:00
parent f97854e424
commit 5f6f67c7fc
No known key found for this signature in database
GPG Key ID: B371B98C5DC32237
7 changed files with 89 additions and 42 deletions

View File

@ -4,7 +4,8 @@ import im.molly.monero.BlockchainTime;
import im.molly.monero.internal.TxInfo;
oneway interface IBalanceListener {
void onBalanceChanged(in List<TxInfo> txHistory, in String[] subAddresses, in BlockchainTime blockchainTime);
void onRefresh(in BlockchainTime blockchainTime);
void onSubAddressListUpdated(in String[] subAddresses);
void onBalanceUpdateFinalized(in List<TxInfo> txBatch, in String[] allSubAddresses, in BlockchainTime blockchainTime);
void onBalanceUpdateChunk(in List<TxInfo> txBatch);
void onWalletRefreshed(in BlockchainTime blockchainTime);
void onSubAddressListUpdated(in String[] allSubAddresses);
}

View File

@ -1,10 +0,0 @@
package im.molly.monero
import android.os.IInterface
/**
* Returns whether this interface is in a remote process.
*/
fun IInterface.isRemote(): Boolean {
return asBinder() !== this
}

View File

@ -107,30 +107,40 @@ class MoneroWallet internal constructor(
*/
fun ledger(): Flow<Ledger> = callbackFlow {
val listener = object : IBalanceListener.Stub() {
lateinit var lastKnownLedger: Ledger
private lateinit var lastKnownLedger: Ledger
override fun onBalanceChanged(
txHistory: MutableList<TxInfo>,
subAddresses: Array<String>,
private val txListBuffer = mutableListOf<TxInfo>()
override fun onBalanceUpdateFinalized(
txBatch: List<TxInfo>,
allSubAddresses: Array<String>,
blockchainTime: BlockchainTime,
) {
val accounts = parseAndAggregateAddresses(subAddresses.asIterable())
val txList = if (txListBuffer.isEmpty()) txBatch else txListBuffer.apply { addAll(txBatch) }
val accounts = parseAndAggregateAddresses(allSubAddresses.asIterable())
val ledger = LedgerFactory.createFromTxHistory(
txHistory = txHistory,
txList = txList,
accounts = accounts,
blockchainTime = blockchainTime,
)
txListBuffer.clear()
sendLedger(ledger)
}
override fun onRefresh(blockchainTime: BlockchainTime) {
override fun onBalanceUpdateChunk(txBatch: List<TxInfo>) {
txListBuffer.addAll(txBatch)
}
override fun onWalletRefreshed(blockchainTime: BlockchainTime) {
sendLedger(lastKnownLedger.copy(checkedAt = blockchainTime))
}
override fun onSubAddressListUpdated(subAddresses: Array<String>) {
val accountsUpdated = parseAndAggregateAddresses(subAddresses.asIterable())
if (lastKnownLedger.indexedAccounts != accountsUpdated) {
sendLedger(lastKnownLedger.copy(indexedAccounts = accountsUpdated))
override fun onSubAddressListUpdated(allSubAddresses: Array<String>) {
val accounts = parseAndAggregateAddresses(allSubAddresses.asIterable())
if (accounts != lastKnownLedger.indexedAccounts) {
sendLedger(lastKnownLedger.copy(indexedAccounts = accounts))
}
}
@ -142,7 +152,6 @@ class MoneroWallet internal constructor(
}
wallet.addBalanceListener(listener)
awaitClose { wallet.removeBalanceListener(listener) }
}.conflate()

View File

@ -8,6 +8,8 @@ import im.molly.monero.internal.IHttpRequestCallback
import im.molly.monero.internal.IHttpRpcClient
import im.molly.monero.internal.LedgerFactory
import im.molly.monero.internal.TxInfo
import im.molly.monero.internal.getMaxIpcSize
import im.molly.monero.internal.isRemote
import kotlinx.coroutines.*
import java.io.Closeable
import java.time.Instant
@ -121,7 +123,7 @@ internal class WalletNative private constructor(
fun getLedger(): Ledger {
return LedgerFactory.createFromTxHistory(
txHistory = getTxHistorySnapshot(),
txList = getTxHistorySnapshot(),
accounts = getAllAccounts(),
blockchainTime = getCurrentBlockchainTime(),
)
@ -276,10 +278,11 @@ internal class WalletNative private constructor(
override fun addBalanceListener(listener: IBalanceListener) {
val txHistory = getTxHistorySnapshot()
val subAddresses = getSubAddresses()
val blockchainTime = getCurrentBlockchainTime()
balanceListenersLock.withLock {
balanceListeners.add(listener)
listener.onBalanceChanged(txHistory, subAddresses, getCurrentBlockchainTime())
notifyBalanceInBatchesUnlock(listener, txHistory, subAddresses, blockchainTime)
}
}
@ -318,6 +321,30 @@ internal class WalletNative private constructor(
}
}
private fun notifyBalanceInBatchesUnlock(
listener: IBalanceListener,
txList: List<TxInfo>,
subAddresses: Array<String>,
blockchainTime: BlockchainTime,
) {
if (txList.isEmpty()) {
listener.onBalanceUpdateFinalized(emptyList(), subAddresses, blockchainTime)
return
}
val batchSize = getMaxIpcSize() / TxInfo.MAX_PARCEL_SIZE_BYTES
val chunkedSeq = txList.asSequence().chunked(batchSize).iterator()
while (chunkedSeq.hasNext()) {
val chunk = chunkedSeq.next()
if (chunkedSeq.hasNext()) {
listener.onBalanceUpdateChunk(chunk)
} else {
listener.onBalanceUpdateFinalized(chunk, subAddresses, blockchainTime)
}
}
}
private fun notifyAddressCreation(subAddress: String, callback: IWalletCallbacks?) {
balanceListenersLock.withLock {
if (balanceListeners.isNotEmpty()) {
@ -353,14 +380,14 @@ internal class WalletNative private constructor(
if (balanceListeners.isNotEmpty()) {
val blockchainTime = network.blockchainTime(height, timestamp)
val call = if (balanceChanged) {
val txHistory = getTxHistorySnapshot()
val txList = getTxHistorySnapshot()
val subAddresses = getSubAddresses()
fun(listener: IBalanceListener) {
listener.onBalanceChanged(txHistory, subAddresses, blockchainTime)
notifyBalanceInBatchesUnlock(listener, txList, subAddresses, blockchainTime)
}
} else {
fun(listener: IBalanceListener) {
listener.onRefresh(blockchainTime)
listener.onWalletRefreshed(blockchainTime)
}
}
balanceListeners.forEach { call(it) }

View File

@ -0,0 +1,18 @@
package im.molly.monero.internal
import android.os.Build
import android.os.IBinder
import android.os.IInterface
/**
* Returns whether this interface is in a remote process.
*/
fun IInterface.isRemote(): Boolean {
return asBinder() !== this
}
fun getMaxIpcSize(): Int = if (Build.VERSION.SDK_INT >= 30) {
IBinder.getSuggestedMaxIpcSizeBytes()
} else {
64 * 1024
}

View File

@ -7,11 +7,11 @@ import im.molly.monero.findAddressByIndex
internal object LedgerFactory {
fun createFromTxHistory(
txHistory: List<TxInfo>,
txList: List<TxInfo>,
accounts: List<WalletAccount>,
blockchainTime: BlockchainTime,
): Ledger {
val (txById, enotes) = txHistory.consolidateTransactions(
val (txById, enotes) = txList.consolidateTransactions(
accounts = accounts,
blockchainContext = blockchainTime,
)

View File

@ -47,15 +47,17 @@ internal data class TxInfo @CalledByNative constructor(
val incoming: Boolean,
) : Parcelable {
companion object State {
const val OFF_CHAIN: Byte = 1
const val PENDING: Byte = 2
const val FAILED: Byte = 3
const val ON_CHAIN: Byte = 4
companion object {
const val STATE_OFF_CHAIN: Byte = 1
const val STATE_PENDING: Byte = 2
const val STATE_FAILED: Byte = 3
const val STATE_ON_CHAIN: Byte = 4
const val MAX_PARCEL_SIZE_BYTES = 224
}
init {
require(state in OFF_CHAIN..ON_CHAIN)
require(state in STATE_OFF_CHAIN..STATE_ON_CHAIN)
require(amount >= 0 && fee >= 0 && change >= 0) {
"TX amounts cannot be negative"
}
@ -145,10 +147,10 @@ private fun List<TxInfo>.determineTxState(): TxState {
val timestamp = maxOf { it.timestamp }
return when (val state = first().state) {
TxInfo.OFF_CHAIN -> TxState.OffChain
TxInfo.PENDING -> TxState.InMemoryPool
TxInfo.FAILED -> TxState.Failed
TxInfo.ON_CHAIN -> TxState.OnChain(BlockHeader(height, timestamp))
TxInfo.STATE_OFF_CHAIN -> TxState.OffChain
TxInfo.STATE_PENDING -> TxState.InMemoryPool
TxInfo.STATE_FAILED -> TxState.Failed
TxInfo.STATE_ON_CHAIN -> TxState.OnChain(BlockHeader(height, timestamp))
else -> error("Invalid tx state value: $state")
}
}