From 3268836a5b4a91050bface3f19f99476310cca89 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 22 Apr 2024 22:16:41 -0400 Subject: [PATCH] improve offline subkey writes to ensure failed setvalue even when online tries again add help for record list opened as well as active watch debugging --- veilid-core/src/storage_manager/debug.rs | 7 ++- veilid-core/src/storage_manager/mod.rs | 36 +++++++------- .../storage_manager/storage_manager_inner.rs | 30 +++++++++--- .../tasks/offline_subkey_writes.rs | 47 ++++++++++++++----- veilid-core/src/veilid_api/debug.rs | 2 +- 5 files changed, 81 insertions(+), 41 deletions(-) diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 2332ce31..25bf7fff 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -24,7 +24,12 @@ impl StorageManager { } else { "".to_owned() }; - out += &format!(" {} {},\n", k, writer); + let watch = if let Some(w) = v.active_watch() { + format!(" watch: {:?}\n", w) + } else { + "".to_owned() + }; + out += &format!(" {} {}{}\n", k, writer, watch); } format!("{}]\n", out) } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index e7986b64..27bf5d3f 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -25,7 +25,7 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576; /// Frequency to flush record stores to disk const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; /// Frequency to check for offline subkeys writes to send to the network -const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; +const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 5; /// Frequency to send ValueChanged notifications to the network const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; /// Frequency to check for dead nodes and routes for client-side active watches @@ -424,7 +424,7 @@ impl StorageManager { key, core::iter::once((subkey, &result.fanout_result)), false, - )?; + ); // If we got a new value back then write it to the opened record if Some(get_result_value.value_data().seq()) != opt_last_seq { @@ -527,16 +527,7 @@ impl StorageManager { let Some(rpc_processor) = Self::online_ready_inner(&inner) else { log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush - inner - .offline_subkey_writes - .entry(key) - .and_modify(|x| { - x.subkeys.insert(subkey); - }) - .or_insert(OfflineSubkeyWrite { - safety_selection, - subkeys: ValueSubkeyRangeSet::single(subkey), - }); + inner.add_offline_subkey_write(key, subkey, safety_selection); return Ok(None); }; @@ -546,7 +537,7 @@ impl StorageManager { log_stor!(debug "Writing subkey to the network: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Use the safety selection we opened the record with - let result = self + let result = match self .outbound_set_value( rpc_processor, key, @@ -555,15 +546,20 @@ impl StorageManager { signed_value_data.clone(), descriptor, ) - .await?; + .await + { + Ok(v) => v, + Err(e) => { + // Failed to write, try again later + let mut inner = self.lock().await?; + inner.add_offline_subkey_write(key, subkey, safety_selection); + return Err(e); + } + }; // Keep the list of nodes that returned a value for later reference let mut inner = self.lock().await?; - inner.process_fanout_results( - key, - core::iter::once((subkey, &result.fanout_result)), - true, - )?; + inner.process_fanout_results(key, core::iter::once((subkey, &result.fanout_result)), true); // Return the new value if it differs from what was asked to set if result.signed_value_data.value_data() != signed_value_data.value_data() { @@ -877,7 +873,7 @@ impl StorageManager { .iter() .zip(result.fanout_results.iter()); - inner.process_fanout_results(key, results_iter, false)?; + inner.process_fanout_results(key, results_iter, false); Ok(DHTRecordReport::new( result.inspect_result.subkeys, diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 4f4be09a..57ffdb1a 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -473,16 +473,18 @@ impl StorageManagerInner { Ok(opt_value_nodes) } - pub fn process_fanout_results<'a, I: IntoIterator>( + pub(super) fn process_fanout_results< + 'a, + I: IntoIterator, + >( &mut self, key: TypedKey, subkey_results_iter: I, is_set: bool, - ) -> VeilidAPIResult<()> { + ) { // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; + let local_record_store = self.local_record_store.as_mut().unwrap(); + let cur_ts = get_aligned_timestamp(); local_record_store.with_record_mut(key, |r| { let d = r.detail_mut(); @@ -514,7 +516,6 @@ impl StorageManagerInner { d.nodes.remove(&dead_node_key.0); } }); - Ok(()) } pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult> { @@ -690,4 +691,21 @@ impl StorageManagerInner { let hash = vcrypto.generate_hash(&hash_data); TypedKey::new(vcrypto.kind(), hash) } + + pub(super) fn add_offline_subkey_write( + &mut self, + key: TypedKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + ) { + self.offline_subkey_writes + .entry(key) + .and_modify(|x| { + x.subkeys.insert(subkey); + }) + .or_insert(OfflineSubkeyWrite { + safety_selection, + subkeys: ValueSubkeyRangeSet::single(subkey), + }); + } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index d2210d10..71381849 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -20,6 +20,8 @@ impl StorageManager { out }; + let mut fanout_results = vec![]; + for (key, osw) in offline_subkey_writes.iter_mut() { if poll!(stop_token.clone()).is_ready() { log_stor!(debug "Offline subkey writes cancelled."); @@ -61,26 +63,45 @@ impl StorageManager { .await; match osvres { Ok(osv) => { - if let Some(update_callback) = opt_update_callback.clone() { - // Send valuechange with dead count and no subkeys - update_callback(VeilidUpdate::ValueChange(Box::new( - VeilidValueChange { - key: *key, - subkeys: ValueSubkeyRangeSet::single(subkey), - count: u32::MAX, - value: Some(osv.signed_value_data.value_data().clone()), - }, - ))); - } - written_subkeys.insert(subkey); + match osv.fanout_result.kind { + FanoutResultKind::Timeout => { + log_stor!(debug "timed out writing offline subkey: {}:{}", key, subkey); + } + FanoutResultKind::Finished | FanoutResultKind::Exhausted => { + if let Some(update_callback) = opt_update_callback.clone() { + // Send valuechange with dead count and no subkeys + update_callback(VeilidUpdate::ValueChange(Box::new( + VeilidValueChange { + key: *key, + subkeys: ValueSubkeyRangeSet::single(subkey), + count: u32::MAX, + value: Some(osv.signed_value_data.value_data().clone()), + }, + ))); + } + written_subkeys.insert(subkey); + } + }; + + fanout_results.push((subkey, osv.fanout_result)); } Err(e) => { - log_stor!(debug "failed to write offline subkey: {}", e); + log_stor!(debug "failed to write offline subkey: {}:{} {}", key, subkey, e); } } } osw.subkeys = osw.subkeys.difference(&written_subkeys); + + // Keep the list of nodes that returned a value for later reference + { + let mut inner = self.lock().await?; + inner.process_fanout_results( + *key, + fanout_results.iter().map(|x| (x.0, &x.1)), + true, + ); + } } // Add any subkeys back in that were not successfully written diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 33c9e22b..11c1900f 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1951,7 +1951,7 @@ route allocate [ord|*ord] [rel] [] [in|out] list import test -record list +record list purge [bytes] create [ []] open [+] []