From 62c38a76423b9d1787ab3cb8532f8fdfe05d60d5 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 19 Apr 2024 15:00:43 -0400 Subject: [PATCH] valuechange now fires for offline writes --- .../tasks/offline_subkey_writes.rs | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index cd31dfdc..d2210d10 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -10,13 +10,17 @@ impl StorageManager { _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { - let offline_subkey_writes = { - let inner = self.lock().await?; - inner.offline_subkey_writes.clone() + let (mut offline_subkey_writes, opt_update_callback) = { + let mut inner = self.lock().await?; + let out = ( + inner.offline_subkey_writes.clone(), + inner.update_callback.clone(), + ); + inner.offline_subkey_writes.clear(); + out }; - // make a safety selection that is conservative - for (key, osw) in offline_subkey_writes { + for (key, osw) in offline_subkey_writes.iter_mut() { if poll!(stop_token.clone()).is_ready() { log_stor!(debug "Offline subkey writes cancelled."); break; @@ -25,10 +29,12 @@ impl StorageManager { log_stor!(debug "Offline subkey writes stopped for network."); break; }; + + let mut written_subkeys = ValueSubkeyRangeSet::new(); for subkey in osw.subkeys.iter() { let get_result = { let mut inner = self.lock().await?; - inner.handle_get_local_value(key, subkey, true).await + inner.handle_get_local_value(*key, subkey, true).await }; let Ok(get_result) = get_result else { log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); @@ -43,22 +49,52 @@ impl StorageManager { continue; }; log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len()); - if let Err(e) = self + let osvres = self .outbound_set_value( rpc_processor.clone(), - key, + *key, subkey, osw.safety_selection, value, descriptor, ) - .await - { - log_stor!(debug "failed to write offline subkey: {}", e); + .await; + match osvres { + Ok(osv) => { + if let Some(update_callback) = opt_update_callback.clone() { + // Send valuechange with dead count and no subkeys + update_callback(VeilidUpdate::ValueChange(Box::new( + VeilidValueChange { + key: *key, + subkeys: ValueSubkeyRangeSet::single(subkey), + count: u32::MAX, + value: Some(osv.signed_value_data.value_data().clone()), + }, + ))); + } + written_subkeys.insert(subkey); + } + Err(e) => { + log_stor!(debug "failed to write offline subkey: {}", e); + } } } - let mut inner = self.lock().await?; - inner.offline_subkey_writes.remove(&key); + + osw.subkeys = osw.subkeys.difference(&written_subkeys); + } + + // Add any subkeys back in that were not successfully written + let mut inner = self.lock().await?; + for (key, osw) in offline_subkey_writes { + if !osw.subkeys.is_empty() { + inner + .offline_subkey_writes + .entry(key) + .and_modify(|x| { + x.subkeys = x.subkeys.union(&osw.subkeys); + }) + .or_insert(osw); + } } Ok(())