more offline subkey write improvements

This commit is contained in:
Christien Rioux 2024-04-23 12:22:54 -04:00
parent 3268836a5b
commit 6373cc0e9d
4 changed files with 68 additions and 22 deletions

View file

@ -33,6 +33,14 @@ impl StorageManager {
} }
format!("{}]\n", out) format!("{}]\n", out)
} }
pub(crate) async fn debug_offline_records(&self) -> String {
let inner = self.inner.lock().await;
let mut out = "[\n".to_owned();
for (k, v) in &inner.offline_subkey_writes {
out += &format!(" {}:{:?}\n", k, v);
}
format!("{}]\n", out)
}
pub(crate) async fn purge_local_records(&self, reclaim: Option<usize>) -> String { pub(crate) async fn purge_local_records(&self, reclaim: Option<usize>) -> String {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;

View file

@ -557,8 +557,17 @@ impl StorageManager {
} }
}; };
// Keep the list of nodes that returned a value for later reference // Regain the lock after network access
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// Report on fanout result offline
let was_offline = self.check_fanout_set_offline(key, subkey, &result.fanout_result);
if was_offline {
// Failed to write, try again later
inner.add_offline_subkey_write(key, subkey, safety_selection);
}
// Keep the list of nodes that returned a value for later reference
inner.process_fanout_results(key, core::iter::once((subkey, &result.fanout_result)), true); inner.process_fanout_results(key, core::iter::once((subkey, &result.fanout_result)), true);
// Return the new value if it differs from what was asked to set // Return the new value if it differs from what was asked to set
@ -910,4 +919,32 @@ impl StorageManager {
Ok(()) Ok(())
} }
fn check_fanout_set_offline(
&self,
key: TypedKey,
subkey: ValueSubkey,
fanout_result: &FanoutResult,
) -> bool {
match fanout_result.kind {
FanoutResultKind::Timeout => {
log_stor!(debug "timeout in set_value, adding offline subkey: {}:{}", key, subkey);
true
}
FanoutResultKind::Exhausted => {
let get_consensus =
self.unlocked_inner.config.get().network.dht.get_value_count as usize;
let value_node_count = fanout_result.value_nodes.len();
if value_node_count < get_consensus {
log_stor!(debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
value_node_count, get_consensus,
key, subkey);
true
} else {
false
}
}
FanoutResultKind::Finished => false,
}
}
} }

View file

@ -62,28 +62,24 @@ impl StorageManager {
) )
.await; .await;
match osvres { match osvres {
Ok(osv) => { Ok(result) => {
match osv.fanout_result.kind { let was_offline =
FanoutResultKind::Timeout => { self.check_fanout_set_offline(*key, subkey, &result.fanout_result);
log_stor!(debug "timed out writing offline subkey: {}:{}", key, subkey); if !was_offline {
} if let Some(update_callback) = opt_update_callback.clone() {
FanoutResultKind::Finished | FanoutResultKind::Exhausted => { // Send valuechange with dead count and no subkeys
if let Some(update_callback) = opt_update_callback.clone() { update_callback(VeilidUpdate::ValueChange(Box::new(
// Send valuechange with dead count and no subkeys VeilidValueChange {
update_callback(VeilidUpdate::ValueChange(Box::new( key: *key,
VeilidValueChange { subkeys: ValueSubkeyRangeSet::single(subkey),
key: *key, count: u32::MAX,
subkeys: ValueSubkeyRangeSet::single(subkey), value: Some(result.signed_value_data.value_data().clone()),
count: u32::MAX, },
value: Some(osv.signed_value_data.value_data().clone()), )));
},
)));
}
written_subkeys.insert(subkey);
} }
written_subkeys.insert(subkey);
}; };
fanout_results.push((subkey, result.fanout_result));
fanout_results.push((subkey, osv.fanout_result));
} }
Err(e) => { Err(e) => {
log_stor!(debug "failed to write offline subkey: {}:{} {}", key, subkey, e); log_stor!(debug "failed to write offline subkey: {}:{} {}", key, subkey, e);

View file

@ -1414,6 +1414,11 @@ impl VeilidAPI {
out += &storage_manager.debug_opened_records().await; out += &storage_manager.debug_opened_records().await;
out out
} }
"offline" => {
let mut out = "Offline Records:\n".to_string();
out += &storage_manager.debug_offline_records().await;
out
}
_ => "Invalid scope\n".to_owned(), _ => "Invalid scope\n".to_owned(),
}; };
Ok(out) Ok(out)
@ -1951,7 +1956,7 @@ route allocate [ord|*ord] [rel] [<count>] [in|out]
list list
import <blob> import <blob>
test <route> test <route>
record list <local|remote|opened> record list <local|remote|opened|offline>
purge <local|remote> [bytes] purge <local|remote> [bytes]
create <dhtschema> [<cryptokind> [<safety>]] create <dhtschema> [<cryptokind> [<safety>]]
open <key>[+<safety>] [<writer>] open <key>[+<safety>] [<writer>]