unmark active subkey writes if not allow_offline

This commit is contained in:
Christien Rioux 2025-06-15 20:35:42 -04:00
parent ee1e2b436f
commit 24a098728c

View file

@ -720,6 +720,41 @@ impl StorageManager {
Ok(out)
}
// Returns false if we were not already writing
// Returns true if this subkey was already being written to
fn mark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
) -> bool {
let asw = inner.active_subkey_writes.entry(record_key).or_default();
if asw.contains(subkey) {
veilid_log!(self debug "Already writing to this subkey: {}:{}", record_key, subkey);
true
} else {
// Add to our list of active subkey writes
asw.insert(subkey);
false
}
}
fn unmark_active_subkey_write_inner(
&self,
inner: &mut StorageManagerInner,
record_key: TypedRecordKey,
subkey: ValueSubkey,
) {
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&record_key).unwrap();
if !asw.remove(subkey) {
veilid_log!(self error "missing active subkey write: {}:{}", record_key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&record_key);
}
}
/// Set the value of a subkey on an opened local record
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn set_value(
@ -816,17 +851,7 @@ impl StorageManager {
// Note that we are writing this subkey actively
// If it appears we are already doing this, then put it to the offline queue
let already_writing = {
let asw = inner.active_subkey_writes.entry(record_key).or_default();
if asw.contains(subkey) {
veilid_log!(self debug "Already writing to this subkey: {}:{}", record_key, subkey);
true
} else {
// Add to our list of active subkey writes
asw.insert(subkey);
false
}
};
let already_writing = self.mark_active_subkey_write_inner(&mut inner, record_key, subkey);
if already_writing || !self.dht_is_online() {
if allow_offline == AllowOffline(true) {
@ -840,6 +865,7 @@ impl StorageManager {
);
return Ok(None);
} else {
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
apibail_try_again!("offline, try again later");
}
};
@ -873,17 +899,13 @@ impl StorageManager {
safety_selection,
);
} else {
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
apibail_try_again!("offline, try again later");
}
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&record_key).unwrap();
if !asw.remove(subkey) {
panic!("missing active subkey write: {}:{}", record_key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&record_key);
}
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
if matches!(e, VeilidAPIError::TryAgain { message: _ }) {
return Ok(None);
}
@ -931,13 +953,8 @@ impl StorageManager {
let mut inner = self.inner.lock().await;
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&record_key).unwrap();
if !asw.remove(subkey) {
panic!("missing active subkey write: {}:{}", record_key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&record_key);
}
self.unmark_active_subkey_write_inner(&mut inner, record_key, subkey);
if matches!(out, Err(VeilidAPIError::TryAgain { message: _ })) {
return Ok(None);
}