valuechange now fires for offline writes

This commit is contained in:
Christien Rioux 2024-04-19 15:00:43 -04:00
parent 4a190a6853
commit 62c38a7642

View File

@ -10,13 +10,17 @@ impl StorageManager {
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let offline_subkey_writes = {
let inner = self.lock().await?;
inner.offline_subkey_writes.clone()
let (mut offline_subkey_writes, opt_update_callback) = {
let mut inner = self.lock().await?;
let out = (
inner.offline_subkey_writes.clone(),
inner.update_callback.clone(),
);
inner.offline_subkey_writes.clear();
out
};
// make a safety selection that is conservative
for (key, osw) in offline_subkey_writes {
for (key, osw) in offline_subkey_writes.iter_mut() {
if poll!(stop_token.clone()).is_ready() {
log_stor!(debug "Offline subkey writes cancelled.");
break;
@ -25,10 +29,12 @@ impl StorageManager {
log_stor!(debug "Offline subkey writes stopped for network.");
break;
};
let mut written_subkeys = ValueSubkeyRangeSet::new();
for subkey in osw.subkeys.iter() {
let get_result = {
let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await
inner.handle_get_local_value(*key, subkey, true).await
};
let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
@ -43,22 +49,52 @@ impl StorageManager {
continue;
};
log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len());
if let Err(e) = self
let osvres = self
.outbound_set_value(
rpc_processor.clone(),
key,
*key,
subkey,
osw.safety_selection,
value,
descriptor,
)
.await
{
.await;
match osvres {
Ok(osv) => {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(
VeilidValueChange {
key: *key,
subkeys: ValueSubkeyRangeSet::single(subkey),
count: u32::MAX,
value: Some(osv.signed_value_data.value_data().clone()),
},
)));
}
written_subkeys.insert(subkey);
}
Err(e) => {
log_stor!(debug "failed to write offline subkey: {}", e);
}
}
}
osw.subkeys = osw.subkeys.difference(&written_subkeys);
}
// Add any subkeys back in that were not successfully written
let mut inner = self.lock().await?;
inner.offline_subkey_writes.remove(&key);
for (key, osw) in offline_subkey_writes {
if !osw.subkeys.is_empty() {
inner
.offline_subkey_writes
.entry(key)
.and_modify(|x| {
x.subkeys = x.subkeys.union(&osw.subkeys);
})
.or_insert(osw);
}
}
Ok(())