lib: use a mutex for the refresh outer loop

This commit is contained in:
Oscar Mira 2023-02-26 13:26:16 +01:00
parent a34ace5fc8
commit a7b4c0caa9
2 changed files with 48 additions and 30 deletions

View File

@ -35,7 +35,8 @@ Wallet::Wallet(
m_account_ready(false), m_account_ready(false),
m_blockchain_height(1), m_blockchain_height(1),
m_restore_height(0), m_restore_height(0),
m_refresh_continue(false) { m_refresh_running(false),
m_refresh_stopped(false) {
// Use a bogus ipv6 address as a placeholder for the daemon address. // Use a bogus ipv6 address as a placeholder for the daemon address.
LOG_FATAL_IF(!m_wallet.init("[100::/64]", {}, {}, 0, false), LOG_FATAL_IF(!m_wallet.init("[100::/64]", {}, {}, 0, false),
"Init failed"); "Init failed");
@ -146,30 +147,39 @@ void Wallet::handleNewBlock(uint64_t height) {
} }
Wallet::Status Wallet::refreshLoopUntilSynced(bool skip_coinbase) { Wallet::Status Wallet::refreshLoopUntilSynced(bool skip_coinbase) {
std::unique_lock<std::mutex> lock(m_wallet_mutex); Status ret;
m_refresh_continue = true; std::unique_lock<std::mutex> refresh_lock(m_refresh_mutex);
Status ret = Status::INTERRUPTED; m_refresh_stopped = false;
while (m_refresh_continue) { for (;;) {
try { std::lock_guard<std::mutex> wallet_lock(m_wallet_mutex);
if (m_refresh_stopped) {
ret = Status::INTERRUPTED;
break;
}
m_refresh_running = true;
m_wallet.set_refresh_type(skip_coinbase ? tools::wallet2::RefreshType::RefreshNoCoinbase m_wallet.set_refresh_type(skip_coinbase ? tools::wallet2::RefreshType::RefreshNoCoinbase
: tools::wallet2::RefreshType::RefreshDefault); : tools::wallet2::RefreshType::RefreshDefault);
m_wallet.set_refresh_from_block_height(m_restore_height); m_wallet.set_refresh_from_block_height(m_restore_height);
// It will block until we call stop() or it sync successfully. try {
// Calling refresh() will block until stop() is called or it sync successfully.
m_wallet.refresh(false); m_wallet.refresh(false);
} catch (const tools::error::no_connection_to_daemon&) {
m_refresh_running = false;
ret = Status::NO_NETWORK_CONNECTIVITY;
break;
} catch (const tools::error::refresh_error) {
m_refresh_running = false;
ret = Status::REFRESH_ERROR;
break;
}
m_refresh_running = false;
if (!m_wallet.stopped()) { if (!m_wallet.stopped()) {
ret = Status::OK; ret = Status::OK;
break; break;
} }
} catch (const tools::error::no_connection_to_daemon&) { m_refresh_cond.wait(refresh_lock);
ret = Status::NO_NETWORK_CONNECTIVITY;
break;
} catch (const tools::error::refresh_error) {
ret = Status::REFRESH_ERROR;
break;
} }
m_refresh_cond.wait(lock); refresh_lock.unlock();
}
lock.unlock();
// Always notify the last block height. // Always notify the last block height.
m_callback.callVoidMethod(getJniEnv(), Wallet_onRefresh, m_blockchain_height, false); m_callback.callVoidMethod(getJniEnv(), Wallet_onRefresh, m_blockchain_height, false);
return ret; return ret;
@ -177,31 +187,37 @@ Wallet::Status Wallet::refreshLoopUntilSynced(bool skip_coinbase) {
template<typename T> template<typename T>
auto Wallet::pauseRefreshAndRunLocked(T block) -> decltype(block()) { auto Wallet::pauseRefreshAndRunLocked(T block) -> decltype(block()) {
std::unique_lock<std::mutex> lock(m_wallet_mutex, std::defer_lock); std::unique_lock<std::mutex> refresh_lock(m_refresh_mutex, std::try_to_lock);
while (!lock.try_lock()) { if (!refresh_lock.owns_lock()) {
JNIEnv* env = getJniEnv();
do {
if (refresh_is_running()) {
m_wallet.stop(); m_wallet.stop();
std::this_thread::yield(); m_remote_node_client.callVoidMethod(env, IRemoteNodeClient_cancelAll);
} }
auto res = block(); std::this_thread::yield();
} while (!refresh_lock.try_lock());
}
LOG_FATAL_IF(refresh_is_running());
std::lock_guard<std::mutex> wallet_lock(m_wallet_mutex);
m_refresh_mutex.unlock();
m_refresh_cond.notify_one(); m_refresh_cond.notify_one();
return res; return block();
} }
void Wallet::stopRefresh() { void Wallet::stopRefresh() {
pauseRefreshAndRunLocked([&]() -> int { pauseRefreshAndRunLocked([&]() {
m_refresh_continue = false; m_refresh_stopped = true;
return 0;
}); });
} }
void Wallet::setRefreshSince(long height_or_timestamp) { void Wallet::setRefreshSince(long height_or_timestamp) {
pauseRefreshAndRunLocked([&]() -> int { pauseRefreshAndRunLocked([&]() {
if (height_or_timestamp < CRYPTONOTE_MAX_BLOCK_NUMBER) { if (height_or_timestamp < CRYPTONOTE_MAX_BLOCK_NUMBER) {
m_restore_height = height_or_timestamp; m_restore_height = height_or_timestamp;
} else { } else {
LOG_FATAL("TODO"); LOG_FATAL("TODO");
} }
return 0;
}); });
} }

View File

@ -44,7 +44,7 @@ class Wallet : tools::i_wallet2_callback {
uint64_t current_blockchain_height() const { return m_blockchain_height; } uint64_t current_blockchain_height() const { return m_blockchain_height; }
bool refresh_is_running() const { return m_refresh_continue; } bool refresh_is_running() const { return m_refresh_running; }
// Extra object's state that need to be persistent. // Extra object's state that need to be persistent.
BEGIN_SERIALIZE_OBJECT() BEGIN_SERIALIZE_OBJECT()
@ -65,13 +65,15 @@ class Wallet : tools::i_wallet2_callback {
// Protects access to m_wallet instance and state fields. // Protects access to m_wallet instance and state fields.
std::mutex m_wallet_mutex; std::mutex m_wallet_mutex;
std::mutex m_tx_outs_mutex; std::mutex m_tx_outs_mutex;
std::mutex m_refresh_mutex;
// Reference to the Wallet kotlin instance. // Reference to Kotlin instances.
const ScopedJvmGlobalRef<jobject> m_remote_node_client;
const ScopedJvmGlobalRef<jobject> m_callback; const ScopedJvmGlobalRef<jobject> m_callback;
std::condition_variable m_refresh_cond; std::condition_variable m_refresh_cond;
bool m_refresh_started; bool m_refresh_running;
bool m_refresh_continue; bool m_refresh_stopped;
template<typename T> template<typename T>
auto pauseRefreshAndRunLocked(T block) -> decltype(block()); auto pauseRefreshAndRunLocked(T block) -> decltype(block());