improve offline subkey writes to ensure failed setvalue even when online tries again

add help for record list opened as well as active watch debugging
This commit is contained in:
Christien Rioux 2024-04-22 22:16:41 -04:00
parent 7daf351608
commit 3268836a5b
5 changed files with 81 additions and 41 deletions

View File

@ -24,7 +24,12 @@ impl StorageManager {
} else {
"".to_owned()
};
out += &format!(" {} {},\n", k, writer);
let watch = if let Some(w) = v.active_watch() {
format!(" watch: {:?}\n", w)
} else {
"".to_owned()
};
out += &format!(" {} {}{}\n", k, writer, watch);
}
format!("{}]\n", out)
}

View File

@ -25,7 +25,7 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
/// Frequency to flush record stores to disk
const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for offline subkeys writes to send to the network
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 5;
/// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for dead nodes and routes for client-side active watches
@ -424,7 +424,7 @@ impl StorageManager {
key,
core::iter::once((subkey, &result.fanout_result)),
false,
)?;
);
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
@ -527,16 +527,7 @@ impl StorageManager {
let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush
inner
.offline_subkey_writes
.entry(key)
.and_modify(|x| {
x.subkeys.insert(subkey);
})
.or_insert(OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
});
inner.add_offline_subkey_write(key, subkey, safety_selection);
return Ok(None);
};
@ -546,7 +537,7 @@ impl StorageManager {
log_stor!(debug "Writing subkey to the network: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Use the safety selection we opened the record with
let result = self
let result = match self
.outbound_set_value(
rpc_processor,
key,
@ -555,15 +546,20 @@ impl StorageManager {
signed_value_data.clone(),
descriptor,
)
.await?;
.await
{
Ok(v) => v,
Err(e) => {
// Failed to write, try again later
let mut inner = self.lock().await?;
inner.add_offline_subkey_write(key, subkey, safety_selection);
return Err(e);
}
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.process_fanout_results(
key,
core::iter::once((subkey, &result.fanout_result)),
true,
)?;
inner.process_fanout_results(key, core::iter::once((subkey, &result.fanout_result)), true);
// Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != signed_value_data.value_data() {
@ -877,7 +873,7 @@ impl StorageManager {
.iter()
.zip(result.fanout_results.iter());
inner.process_fanout_results(key, results_iter, false)?;
inner.process_fanout_results(key, results_iter, false);
Ok(DHTRecordReport::new(
result.inspect_result.subkeys,

View File

@ -473,16 +473,18 @@ impl StorageManagerInner {
Ok(opt_value_nodes)
}
pub fn process_fanout_results<'a, I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>>(
pub(super) fn process_fanout_results<
'a,
I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>,
>(
&mut self,
key: TypedKey,
subkey_results_iter: I,
is_set: bool,
) -> VeilidAPIResult<()> {
) {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let local_record_store = self.local_record_store.as_mut().unwrap();
let cur_ts = get_aligned_timestamp();
local_record_store.with_record_mut(key, |r| {
let d = r.detail_mut();
@ -514,7 +516,6 @@ impl StorageManagerInner {
d.nodes.remove(&dead_node_key.0);
}
});
Ok(())
}
pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<Option<OpenedRecord>> {
@ -690,4 +691,21 @@ impl StorageManagerInner {
let hash = vcrypto.generate_hash(&hash_data);
TypedKey::new(vcrypto.kind(), hash)
}
pub(super) fn add_offline_subkey_write(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) {
self.offline_subkey_writes
.entry(key)
.and_modify(|x| {
x.subkeys.insert(subkey);
})
.or_insert(OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
});
}
}

View File

@ -20,6 +20,8 @@ impl StorageManager {
out
};
let mut fanout_results = vec![];
for (key, osw) in offline_subkey_writes.iter_mut() {
if poll!(stop_token.clone()).is_ready() {
log_stor!(debug "Offline subkey writes cancelled.");
@ -61,26 +63,45 @@ impl StorageManager {
.await;
match osvres {
Ok(osv) => {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(
VeilidValueChange {
key: *key,
subkeys: ValueSubkeyRangeSet::single(subkey),
count: u32::MAX,
value: Some(osv.signed_value_data.value_data().clone()),
},
)));
}
written_subkeys.insert(subkey);
match osv.fanout_result.kind {
FanoutResultKind::Timeout => {
log_stor!(debug "timed out writing offline subkey: {}:{}", key, subkey);
}
FanoutResultKind::Finished | FanoutResultKind::Exhausted => {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(
VeilidValueChange {
key: *key,
subkeys: ValueSubkeyRangeSet::single(subkey),
count: u32::MAX,
value: Some(osv.signed_value_data.value_data().clone()),
},
)));
}
written_subkeys.insert(subkey);
}
};
fanout_results.push((subkey, osv.fanout_result));
}
Err(e) => {
log_stor!(debug "failed to write offline subkey: {}", e);
log_stor!(debug "failed to write offline subkey: {}:{} {}", key, subkey, e);
}
}
}
osw.subkeys = osw.subkeys.difference(&written_subkeys);
// Keep the list of nodes that returned a value for later reference
{
let mut inner = self.lock().await?;
inner.process_fanout_results(
*key,
fanout_results.iter().map(|x| (x.0, &x.1)),
true,
);
}
}
// Add any subkeys back in that were not successfully written

View File

@ -1951,7 +1951,7 @@ route allocate [ord|*ord] [rel] [<count>] [in|out]
list
import <blob>
test <route>
record list <local|remote>
record list <local|remote|opened>
purge <local|remote> [bytes]
create <dhtschema> [<cryptokind> [<safety>]]
open <key>[+<safety>] [<writer>]