checkpoint

This commit is contained in:
Christien Rioux 2023-11-26 15:02:15 -08:00
parent 9b8420d288
commit d1dad8de61
11 changed files with 211 additions and 37 deletions

View File

@ -129,7 +129,7 @@ enum-as-inner = "=0.6.0" # temporary fix for
# Serialization # Serialization
capnp = { version = "0.18.1", default-features = false, features = ["alloc"] } capnp = { version = "0.18.1", default-features = false, features = ["alloc"] }
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive", "rc"] }
serde_json = { version = "1.0.107" } serde_json = { version = "1.0.107" }
serde-big-array = "0.5.1" serde-big-array = "0.5.1"
json = "0.12.4" json = "0.12.4"

View File

@ -174,7 +174,7 @@ impl RPCProcessor {
}; };
// Destructure // Destructure
let (key, subkeys, expiration, count, watcher, signature) = watch_value_q.destructure(); let (key, subkeys, expiration, count, watcher, _signature) = watch_value_q.destructure();
// Get target for ValueChanged notifications // Get target for ValueChanged notifications
let dest = network_result_try!(self.get_respond_to_destination(&msg)); let dest = network_result_try!(self.get_respond_to_destination(&msg));

View File

@ -31,6 +31,8 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for offline subkeys writes to send to the network /// Frequency to check for offline subkeys writes to send to the network
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
/// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
config: VeilidConfig, config: VeilidConfig,
@ -42,6 +44,7 @@ struct StorageManagerUnlockedInner {
// Background processes // Background processes
flush_record_stores_task: TickTask<EyreReport>, flush_record_stores_task: TickTask<EyreReport>,
offline_subkey_writes_task: TickTask<EyreReport>, offline_subkey_writes_task: TickTask<EyreReport>,
send_value_changes_task: TickTask<EyreReport>,
// Anonymous watch keys // Anonymous watch keys
anonymous_watch_keys: TypedKeyPairGroup, anonymous_watch_keys: TypedKeyPairGroup,
@ -76,6 +79,8 @@ impl StorageManager {
block_store, block_store,
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS),
anonymous_watch_keys, anonymous_watch_keys,
} }
} }

View File

@ -30,6 +30,7 @@ struct WatchedRecordWatch {
count: u32, count: u32,
target: Target, target: Target,
watcher: CryptoKey, watcher: CryptoKey,
changed: ValueSubkeyRangeSet,
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
@ -39,6 +40,16 @@ struct WatchedRecord {
watchers: Vec<WatchedRecordWatch>, watchers: Vec<WatchedRecordWatch>,
} }
#[derive(Debug, Clone)]
/// A single 'value changed' message to send
pub struct ValueChangedInfo {
target: Target,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: SignedValueData,
}
pub struct RecordStore<D> pub struct RecordStore<D>
where where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
@ -65,6 +76,8 @@ where
changed_records: HashSet<RecordTableKey>, changed_records: HashSet<RecordTableKey>,
/// The list of records being watched for changes /// The list of records being watched for changes
watched_records: HashMap<RecordTableKey, WatchedRecord>, watched_records: HashMap<RecordTableKey, WatchedRecord>,
/// The list of watched records that have changed values since last notification
changed_watched_values: HashSet<RecordTableKey>,
/// A mutex to ensure we handle this concurrently /// A mutex to ensure we handle this concurrently
purge_dead_records_mutex: Arc<AsyncMutex<()>>, purge_dead_records_mutex: Arc<AsyncMutex<()>>,
@ -74,9 +87,9 @@ where
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct SubkeyResult { pub struct SubkeyResult {
/// The subkey value if we got one /// The subkey value if we got one
pub value: Option<SignedValueData>, pub value: Option<Arc<SignedValueData>>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed /// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<SignedValueDescriptor>, pub descriptor: Option<Arc<SignedValueDescriptor>>,
} }
impl<D> RecordStore<D> impl<D> RecordStore<D>
@ -114,6 +127,7 @@ where
changed_records: HashSet::new(), changed_records: HashSet::new(),
watched_records: HashMap::new(), watched_records: HashMap::new(),
purge_dead_records_mutex: Arc::new(AsyncMutex::new(())), purge_dead_records_mutex: Arc::new(AsyncMutex::new(())),
changed_watched_values: HashSet::new(),
} }
} }
@ -194,10 +208,6 @@ where
}); });
} }
fn mark_record_changed(&mut self, key: RecordTableKey) {
self.changed_records.insert(key);
}
fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) { fn add_to_subkey_cache(&mut self, key: SubkeyTableKey, record_data: RecordData) {
let record_data_total_size = record_data.total_size(); let record_data_total_size = record_data.total_size();
// Write to subkey cache // Write to subkey cache
@ -312,7 +322,6 @@ where
} }
async fn flush_changed_records(&mut self) { async fn flush_changed_records(&mut self) {
// touch records
if self.changed_records.is_empty() { if self.changed_records.is_empty() {
return; return;
} }
@ -334,7 +343,7 @@ where
} }
} }
pub async fn tick(&mut self) -> EyreResult<()> { pub async fn flush(&mut self) -> EyreResult<()> {
self.flush_changed_records().await; self.flush_changed_records().await;
self.purge_dead_records(true).await; self.purge_dead_records(true).await;
Ok(()) Ok(())
@ -416,7 +425,9 @@ where
record.touch(get_aligned_timestamp()); record.touch(get_aligned_timestamp());
} }
if out.is_some() { if out.is_some() {
self.mark_record_changed(rtk); // Marks as changed because the record was touched and we want to keep the
// LRU ordering serialized
self.changed_records.insert(rtk);
} }
out out
@ -451,16 +462,14 @@ where
record.touch(get_aligned_timestamp()); record.touch(get_aligned_timestamp());
} }
if out.is_some() { if out.is_some() {
self.mark_record_changed(rtk); // Marks as changed because the record was touched and we want to keep the
// LRU ordering serialized
self.changed_records.insert(rtk);
} }
out out
} }
// pub fn get_descriptor(&mut self, key: TypedKey) -> Option<SignedValueDescriptor> {
// self.with_record(key, |record| record.descriptor().clone())
// }
pub async fn get_subkey( pub async fn get_subkey(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
@ -600,6 +609,24 @@ where
})) }))
} }
async fn update_watched_value(&mut self, key: TypedKey, subkey: ValueSubkey) {
let rtk = RecordTableKey { key };
let Some(wr) = self.watched_records.get_mut(&rtk) else {
return;
};
// Update all watchers
let mut changed = false;
for w in &mut wr.watchers {
// If this watcher is watching the changed subkey then add to the watcher's changed list
if w.subkeys.contains(subkey) && w.changed.insert(subkey) {
changed = true;
}
}
if changed {
self.changed_watched_values.insert(rtk);
}
}
pub async fn set_subkey( pub async fn set_subkey(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
@ -692,6 +719,9 @@ where
// Update storage space // Update storage space
self.total_storage_space.commit().unwrap(); self.total_storage_space.commit().unwrap();
// Update watched value
self.update_watched_value(key, subkey).await;
Ok(()) Ok(())
} }
@ -719,7 +749,7 @@ where
} }
// Get the record being watched // Get the record being watched
let Some(is_member) = self.with_record_mut(key, |record| { let Some(is_member) = self.with_record(key, |record| {
// Check if the watcher specified is a schema member // Check if the watcher specified is a schema member
let schema = record.schema(); let schema = record.schema();
(*record.owner()) == watcher || schema.is_member(&watcher) (*record.owner()) == watcher || schema.is_member(&watcher)
@ -774,6 +804,7 @@ where
count, count,
target, target,
watcher, watcher,
changed: ValueSubkeyRangeSet::new(),
}); });
Ok(Some(expiration)) Ok(Some(expiration))
} }
@ -786,7 +817,7 @@ where
watcher: CryptoKey, watcher: CryptoKey,
) -> VeilidAPIResult<Option<Timestamp>> { ) -> VeilidAPIResult<Option<Timestamp>> {
// Get the record being watched // Get the record being watched
let Some(is_member) = self.with_record_mut(key, |record| { let Some(is_member) = self.with_record(key, |record| {
// Check if the watcher specified is a schema member // Check if the watcher specified is a schema member
let schema = record.schema(); let schema = record.schema();
(*record.owner()) == watcher || schema.is_member(&watcher) (*record.owner()) == watcher || schema.is_member(&watcher)
@ -828,6 +859,59 @@ where
Ok(ret_timestamp) Ok(ret_timestamp)
} }
pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) {
for rtk in self.changed_watched_values.drain() {
if let Some(watch) = self.watched_records.get_mut(&rtk) {
// Process watch notifications
let mut dead_watchers = vec![];
for (wn, w) in watch.watchers.iter_mut().enumerate() {
// Get the subkeys that have changed
let subkeys = w.changed.clone();
w.changed.clear();
// Reduce the count of changes sent
// if count goes to zero mark this watcher dead
w.count -= 1;
let count = w.count;
if count == 0 {
dead_watchers.push(wn);
}
// Get the first subkey data
let Some(first_subkey) = subkeys.first() else {
log_stor!(error "first subkey should exist for value change notification");
continue;
};
let subkey_result = match self.get_subkey(rtk.key, first_subkey, false).await {
Ok(Some(skr)) => skr,
Ok(None) => {
log_stor!(error "subkey should have data for value change notification");
continue;
}
Err(e) => {
log_stor!(error "error getting subkey data for value change notification: {}", e);
continue;
}
};
let Some(value) = subkey_result.value else {
log_stor!(error "first subkey should have had value for value change notification");
continue;
};
let vci = ValueChangedInfo {
target: w.target.clone(),
key: rtk.key,
subkeys,
count,
value,
};
changes.push(vci);
}
}
}
}
/// LRU out some records until we reclaim the amount of space requested /// LRU out some records until we reclaim the amount of space requested
/// This will force a garbage collection of the space immediately /// This will force a garbage collection of the space immediately
/// If zero is passed in here, a garbage collection will be performed of dead records /// If zero is passed in here, a garbage collection will be performed of dead records

View File

@ -136,12 +136,12 @@ impl StorageManagerInner {
// Final flush on record stores // Final flush on record stores
if let Some(mut local_record_store) = self.local_record_store.take() { if let Some(mut local_record_store) = self.local_record_store.take() {
if let Err(e) = local_record_store.tick().await { if let Err(e) = local_record_store.flush().await {
log_stor!(error "termination local record store tick failed: {}", e); log_stor!(error "termination local record store tick failed: {}", e);
} }
} }
if let Some(mut remote_record_store) = self.remote_record_store.take() { if let Some(mut remote_record_store) = self.remote_record_store.take() {
if let Err(e) = remote_record_store.tick().await { if let Err(e) = remote_record_store.flush().await {
log_stor!(error "termination remote record store tick failed: {}", e); log_stor!(error "termination remote record store tick failed: {}", e);
} }
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
impl StorageManager { impl StorageManager {
// Flush records stores to disk and remove dead records // Flush records stores to disk and remove dead records and send watch notifications
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn flush_record_stores_task_routine( pub(crate) async fn flush_record_stores_task_routine(
self, self,
@ -11,10 +11,10 @@ impl StorageManager {
) -> EyreResult<()> { ) -> EyreResult<()> {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store { if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store.tick().await?; local_record_store.flush().await?;
} }
if let Some(remote_record_store) = &mut inner.remote_record_store { if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store.tick().await?; remote_record_store.flush().await?;
} }
Ok(()) Ok(())
} }

View File

@ -1,5 +1,6 @@
pub mod flush_record_stores; pub mod flush_record_stores;
pub mod offline_subkey_writes; pub mod offline_subkey_writes;
pub mod send_value_changes;
use super::*; use super::*;
@ -47,23 +48,54 @@ impl StorageManager {
) )
}); });
} }
// Set send value changes tick task
debug!("starting send value changes task");
{
let this = self.clone();
self.unlocked_inner
.send_value_changes_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.send_value_changes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager send value changes task routine"
)),
)
});
}
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
// Run the rolling transfers task // Run the flush stores task
self.unlocked_inner.flush_record_stores_task.tick().await?; self.unlocked_inner.flush_record_stores_task.tick().await?;
// Run online-only tasks
if self.online_writes_ready().await?.is_some() {
// Run offline subkey writes task if there's work to be done // Run offline subkey writes task if there's work to be done
if self.online_writes_ready().await?.is_some() && self.has_offline_subkey_writes().await? { if self.has_offline_subkey_writes().await? {
self.unlocked_inner self.unlocked_inner
.offline_subkey_writes_task .offline_subkey_writes_task
.tick() .tick()
.await?; .await?;
} }
// Send value changed notifications
self.unlocked_inner.send_value_changes_task.tick().await?;
}
Ok(()) Ok(())
} }
pub(crate) async fn cancel_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {
debug!("stopping send value changes task");
if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await {
warn!("send_value_changes_task not stopped: {}", e);
}
debug!("stopping flush record stores task"); debug!("stopping flush record stores task");
if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await { if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await {
warn!("flush_record_stores_task not stopped: {}", e); warn!("flush_record_stores_task not stopped: {}", e);

View File

@ -0,0 +1,50 @@
use super::*;
use futures_util::StreamExt;
use stop_token::future::FutureExt;
impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn send_value_changes_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut value_changes: Vec<ValueChangedInfo> = vec![];
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store
.take_value_changes(&mut value_changes)
.await?;
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store
.take_value_changes(&mut value_changes)
.await?;
}
// Send all value changes in parallel
let mut unord = FuturesUnordered::new();
// xxx
while !unord.is_empty() {
match unord.next().timeout_at(stop_token.clone()).await {
Ok(Some(_)) => {
// Some ValueChanged completed
}
Ok(None) => {
// We're empty
}
Err(_) => {
// Timeout means we drop the rest because we were asked to stop
return Ok(());
}
}
}
Ok(())
}
}

View File

@ -1,9 +1,9 @@
use super::*; use super::*;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Record<D> pub struct Record<D>
where where
D: fmt::Debug + Clone + Serialize, D: fmt::Debug + Serialize,
{ {
descriptor: SignedValueDescriptor, descriptor: SignedValueDescriptor,
subkey_count: usize, subkey_count: usize,
@ -15,7 +15,7 @@ where
impl<D> Record<D> impl<D> Record<D>
where where
D: fmt::Debug + Clone + Serialize, D: fmt::Debug + Serialize,
{ {
pub fn new( pub fn new(
cur_ts: Timestamp, cur_ts: Timestamp,

View File

@ -2,15 +2,17 @@ use super::*;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct RecordData { pub struct RecordData {
signed_value_data: SignedValueData, signed_value_data: Arc<SignedValueData>,
} }
xxx continue here, use arc everywhere to avoid copies
impl RecordData { impl RecordData {
pub fn new(signed_value_data: SignedValueData) -> Self { pub fn new(signed_value_data: Arc<SignedValueData>) -> Self {
Self { signed_value_data } Self { signed_value_data }
} }
pub fn signed_value_data(&self) -> &SignedValueData { pub fn signed_value_data(&self) -> Arc<SignedValueData> {
&self.signed_value_data self.signed_value_data.clone()
} }
pub fn data_size(&self) -> usize { pub fn data_size(&self) -> usize {
self.signed_value_data.data_size() self.signed_value_data.data_size()

View File

@ -146,6 +146,7 @@ pub fn fix_veilidconfiginner() -> VeilidConfigInner {
remote_max_storage_space_mb: 19, remote_max_storage_space_mb: 19,
public_watch_limit: 20, public_watch_limit: 20,
member_watch_limit: 21, member_watch_limit: 21,
max_watch_expiration_ms: 22,
}, },
upnp: true, upnp: true,
detect_address_changes: false, detect_address_changes: false,