fix race condition in setting dht values before network is ready

This commit is contained in:
Christien Rioux 2024-03-01 15:32:14 -05:00
parent ad45660db9
commit 31bb8018f6

View File

@ -169,7 +169,7 @@ impl StorageManager {
Ok(inner) Ok(inner)
} }
fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> { fn online_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> {
if let Some(rpc_processor) = { inner.opt_rpc_processor.clone() } { if let Some(rpc_processor) = { inner.opt_rpc_processor.clone() } {
if let Some(network_class) = rpc_processor if let Some(network_class) = rpc_processor
.routing_table() .routing_table()
@ -193,7 +193,7 @@ impl StorageManager {
async fn online_writes_ready(&self) -> EyreResult<Option<RPCProcessor>> { async fn online_writes_ready(&self) -> EyreResult<Option<RPCProcessor>> {
let inner = self.lock().await?; let inner = self.lock().await?;
Ok(Self::online_writes_ready_inner(&inner)) Ok(Self::online_ready_inner(&inner))
} }
async fn has_offline_subkey_writes(&self) -> EyreResult<bool> { async fn has_offline_subkey_writes(&self) -> EyreResult<bool> {
@ -243,7 +243,7 @@ impl StorageManager {
// No record yet, try to get it from the network // No record yet, try to get it from the network
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };
@ -293,7 +293,7 @@ impl StorageManager {
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let (opt_opened_record, opt_rpc_processor) = { let (opt_opened_record, opt_rpc_processor) = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
(inner.close_record(key)?, inner.opt_rpc_processor.clone()) (inner.close_record(key)?, Self::online_ready_inner(&inner))
}; };
// Send a one-time cancel request for the watch if we have one and we're online // Send a one-time cancel request for the watch if we have one and we're online
@ -376,7 +376,7 @@ impl StorageManager {
// Refresh if we can // Refresh if we can
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
// Return the existing value if we have one if we aren't online // Return the existing value if we have one if we aren't online
if let Some(last_subkey_result_value) = last_subkey_result.value { if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.value_data().clone())); return Ok(Some(last_subkey_result_value.value_data().clone()));
@ -499,20 +499,19 @@ impl StorageManager {
writer.secret, writer.secret,
)?); )?);
// Write the value locally first
log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
inner
.handle_set_local_value(
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::NoUpdate,
)
.await?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::online_writes_ready_inner(&inner) else { let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Offline, just write it locally and return immediately
inner
.handle_set_local_value(
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush // Add to offline writes to flush
inner inner
@ -547,18 +546,18 @@ impl StorageManager {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.value_nodes)?; inner.set_value_nodes(key, result.value_nodes)?;
// Whatever record we got back, store it locally, might be newer than the one we asked to save
inner
.handle_set_local_value(
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
// Return the new value if it differs from what was asked to set // Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != signed_value_data.value_data() { if result.signed_value_data.value_data() != signed_value_data.value_data() {
// Record the newer value and send and update since it is different than what we just set
inner
.handle_set_local_value(
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
return Ok(Some(result.signed_value_data.value_data().clone())); return Ok(Some(result.signed_value_data.value_data().clone()));
} }
@ -596,7 +595,7 @@ impl StorageManager {
}; };
// Get rpc processor and drop mutex so we don't block while requesting the watch from the network // Get rpc processor and drop mutex so we don't block while requesting the watch from the network
let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };
@ -729,7 +728,7 @@ impl StorageManager {
async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> {
let rpc_processor = { let rpc_processor = {
let inner = self.inner.lock().await; let inner = self.inner.lock().await;
if let Some(rpc_processor) = &inner.opt_rpc_processor { if let Some(rpc_processor) = Self::online_ready_inner(&inner) {
rpc_processor.clone() rpc_processor.clone()
} else { } else {
apibail_try_again!("network is not available"); apibail_try_again!("network is not available");
@ -745,10 +744,9 @@ impl StorageManager {
.map_err(VeilidAPIError::from)?; .map_err(VeilidAPIError::from)?;
network_result_value_or_log!(rpc_processor network_result_value_or_log!(rpc_processor
.rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone()) .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone())
.await .await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] { .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {});
});
Ok(()) Ok(())
} }