checkpoint

This commit is contained in:
John Smith 2023-05-08 22:05:51 -04:00
parent 43a2c0b699
commit a3e2dbc744
6 changed files with 337 additions and 86 deletions

View File

@ -61,34 +61,6 @@ impl StorageManager {
StorageManagerInner::new(unlocked_inner)
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.local_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: None,
max_subkey_cache_memory_mb: Some(
c.network.dht.local_max_subkey_cache_memory_mb as usize,
),
max_storage_space_mb: None,
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: Some(c.network.dht.remote_max_records as usize),
max_subkey_cache_memory_mb: Some(
c.network.dht.remote_max_subkey_cache_memory_mb as usize,
),
max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize),
}
}
pub fn new(
config: VeilidConfig,
crypto: Crypto,
@ -118,39 +90,7 @@ impl StorageManager {
debug!("startup storage manager");
let mut inner = self.inner.lock().await;
let local_limits = Self::local_limits_from_config(self.unlocked_inner.config.clone());
let remote_limits = Self::remote_limits_from_config(self.unlocked_inner.config.clone());
let mut local_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"local",
local_limits,
);
local_record_store.init().await?;
let mut remote_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"remote",
remote_limits,
);
remote_record_store.init().await?;
inner.local_record_store = Some(local_record_store);
inner.remote_record_store = Some(remote_record_store);
// Schedule tick
let this = self.clone();
let tick_future = interval(1000, move || {
let this = this.clone();
async move {
if let Err(e) = this.tick().await {
warn!("storage manager tick failed: {}", e);
}
}
});
inner.tick_future = Some(tick_future);
inner.initialized = true;
inner.init(self.clone()).await?;
Ok(())
}
@ -159,12 +99,7 @@ impl StorageManager {
debug!("starting storage manager shutdown");
let mut inner = self.inner.lock().await;
// Stop ticker
let tick_future = inner.tick_future.take();
if let Some(f) = tick_future {
f.await;
}
inner.terminate().await;
// Cancel all tasks
self.cancel_tasks().await;
@ -353,14 +288,165 @@ impl StorageManager {
Ok(Some(subkey_result_value.into_value_data()))
}
/// Set the value of a subkey on an opened local record
/// Puts changes to the network immediately and may refresh the record if the there is a newer subkey available online
pub async fn set_value(
&self,
key: TypedKey,
subkey: ValueSubkey,
data: Vec<u8>,
) -> Result<Option<ValueData>, VeilidAPIError> {
let inner = self.lock().await?;
unimplemented!();
let mut inner = self.lock().await?;
// Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else {
apibail_generic!("unsupported cryptosystem");
};
let Some(opened_record) = inner.opened_records.remove(&key) else {
apibail_generic!("record not open");
};
// If we don't have a writer then we can't write
let Some(writer) = opened_record.writer().cloned() else {
apibail_generic!("value is not writable");
};
// See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?;
// Get the descriptor and schema for the key
let Some(descriptor) = last_subkey_result.descriptor else {
apibail_generic!("must have a descriptor");
};
let schema = descriptor.schema()?;
// Make new subkey data
let value_data = if let Some(signed_value_data) = last_subkey_result.value {
let seq = signed_value_data.value_data().seq();
ValueData::new_with_seq(seq + 1, data, writer.key)
} else {
ValueData::new(data, writer.key)
};
// Validate with schema
if !schema.check_subkey_value_data(descriptor.owner(), subkey, &value_data) {
// Validation failed, ignore this value
apibail_generic!("failed schema validation");
}
// Sign the new value data with the writer
let signed_value_data = SignedValueData::make_signature(
value_data,
descriptor.owner(),
subkey,
vcrypto,
writer.secret,
)?;
let subkey_result = SubkeyResult {
value: Some(signed_value_data),
descriptor: Some(descriptor)
};
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, just write it locally and return immediately
inner
.handle_set_local_value(key, subkey, signed_value_data)
.await?;
};
// Drop the lock for network access
drop(inner);
// Use the safety selection we opened the record with
let final_subkey_result = self
.do_set_value(
rpc_processor,
key,
subkey,
opened_record.safety_selection(),
subkey_result,
)
.await?;
// See if we got a value back
let Some(subkey_result_value) = subkey_result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.await?;
}
Ok(Some(subkey_result_value.into_value_data()))
// Store subkey locally
inner
.handle_set_local_value(key, subkey, signed_value_data)
.await?;
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.into_value_data()));
}
}
// Refresh if we can
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later
apibail_try_again!();
};
// Drop the lock for network access
drop(inner);
// May have last descriptor / value
// Use the safety selection we opened the record with
let opt_last_seq = last_subkey_result
.value
.as_ref()
.map(|v| v.value_data().seq());
let subkey_result = self
.do_get_value(
rpc_processor,
key,
subkey,
opened_record.safety_selection(),
last_subkey_result,
)
.await?;
// See if we got a value back
let Some(subkey_result_value) = subkey_result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.await?;
}
Ok(Some(subkey_result_value.into_value_data()))
}
pub async fn watch_values(

View File

@ -11,12 +11,44 @@ pub(super) struct StorageManagerInner {
pub local_record_store: Option<RecordStore<LocalRecordDetail>>,
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// Record subkeys that have not been pushed to the network because they were written to offline
pub offline_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// RPC processor if it is available
pub rpc_processor: Option<RPCProcessor>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>,
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.local_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: None,
max_subkey_cache_memory_mb: Some(
c.network.dht.local_max_subkey_cache_memory_mb as usize,
),
max_storage_space_mb: None,
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: Some(c.network.dht.remote_max_records as usize),
max_subkey_cache_memory_mb: Some(
c.network.dht.remote_max_subkey_cache_memory_mb as usize,
),
max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize),
}
}
impl StorageManagerInner {
pub fn new(unlocked_inner: Arc<StorageManagerUnlockedInner>) -> Self {
Self {
@ -25,11 +57,106 @@ impl StorageManagerInner {
opened_records: Default::default(),
local_record_store: Default::default(),
remote_record_store: Default::default(),
offline_subkey_writes: Default::default(),
metadata_db: Default::default(),
rpc_processor: Default::default(),
tick_future: Default::default(),
}
}
pub async fn init(&mut self, outer_self: StorageManager) -> EyreResult<()> {
let metadata_db = self.unlocked_inner
.table_store
.open(&format!("storage_manager_metadata"), 1)
.await?;
let local_limits = local_limits_from_config(self.unlocked_inner.config.clone());
let remote_limits = remote_limits_from_config(self.unlocked_inner.config.clone());
let mut local_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"local",
local_limits,
);
local_record_store.init().await?;
let mut remote_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"remote",
remote_limits,
);
remote_record_store.init().await?;
self.metadata_db = Some(metadata_db);
self.local_record_store = Some(local_record_store);
self.remote_record_store = Some(remote_record_store);
self.load_metadata().await?;
// Schedule tick
let tick_future = interval(1000, move || {
let this = outer_self.clone();
async move {
if let Err(e) = this.tick().await {
log_stor!(warn "storage manager tick failed: {}", e);
}
}
});
self.tick_future = Some(tick_future);
self.initialized = true;
Ok(())
}
pub async fn terminate(&mut self) {
// Stop ticker
let tick_future = self.tick_future.take();
if let Some(f) = tick_future {
f.await;
}
// Final flush on record stores
if let Some(mut local_record_store) = self.local_record_store.take() {
local_record_store.tick().await;
}
if let Some(mut remote_record_store) = self.remote_record_store.take() {
remote_record_store.tick().await;
}
// Save metadata
if self.metadata_db.is_some() {
if let Err(e) = self.save_metadata().await {
log_stor!(error "termination metadata save failed: {}", e);
}
self.metadata_db = None;
}
self.offline_subkey_writes.clear();
// Mark not initialized
self.initialized = false;
}
async fn save_metadata(&mut self) -> EyreResult<()>{
if let Some(metadata_db) = &self.metadata_db {
let tx = metadata_db.transact();
tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes);
tx.commit().await.wrap_err("failed to commit")?
}
Ok(())
}
async fn load_metadata(&mut self) -> EyreResult<()> {
if let Some(metadata_db) = &self.metadata_db {
self.offline_subkey_writes = metadata_db.load_rkyv(0, b"offline_subkey_writes")?.unwrap_or_default();
}
Ok(())
}
write offline subkey write flush background task or make a ticket for it and get back to it after the rest of set value
pub async fn create_new_owned_local_record(
&mut self,
kind: CryptoKind,

View File

@ -52,20 +52,20 @@ where
D: rkyv::Fallible + ?Sized,
T: rkyv::Archive + Integer,
rkyv::Archived<T>: rkyv::Deserialize<T, D>,
D::Error: From<String>,
// D::Error: From<String>, // xxx this doesn't work
{
fn deserialize_with(
field: &rkyv::Archived<Vec<T>>,
deserializer: &mut D,
) -> Result<RangeSetBlaze<T>, D::Error> {
let mut out = RangeSetBlaze::<T>::new();
if field.len() % 2 == 1 {
return Err("invalid range set length".to_owned().into());
}
// if field.len() % 2 == 1 {
// return Err("invalid range set length".to_owned().into());
// }
let f = field.as_slice();
for i in 0..field.len() / 2 {
let l: T = f[i].deserialize(deserializer)?;
let u: T = f[i + 1].deserialize(deserializer)?;
let l: T = f[i * 2].deserialize(deserializer)?;
let u: T = f[i * 2 + 1].deserialize(deserializer)?;
out.ranges_insert(l..=u);
}
Ok(out)

View File

@ -1,12 +1,14 @@
mod dht_record_descriptor;
mod schema;
mod value_data;
mod value_subkey_range_set;
use super::*;
pub use dht_record_descriptor::*;
pub use schema::*;
pub use value_data::*;
pub use value_subkey_range_set::*;
/// Value subkey
pub type ValueSubkey = u32;

View File

@ -48,16 +48,6 @@ impl ValueData {
&self.data
}
pub fn with_data_mut<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Vec<u8>) -> R,
{
let out = f(&mut self.data);
assert!(self.data.len() <= Self::MAX_LEN);
self.seq += 1;
out
}
pub fn total_size(&self) -> usize {
mem::size_of::<Self>() + self.data.len()
}

View File

@ -0,0 +1,46 @@
use super::*;
use core::ops::{Deref, DerefMut};
use range_set_blaze::*;
#[derive(
Clone,
Debug,
Default,
PartialOrd,
PartialEq,
Eq,
Ord,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueSubkeyRangeSet {
#[with(RkyvRangeSetBlaze)]
#[serde(with = "serialize_range_set_blaze")]
data: RangeSetBlaze<ValueSubkey>,
}
impl ValueSubkeyRangeSet {
pub fn new() -> Self {
Self {
data: Default::default(),
}
}
}
impl Deref for ValueSubkeyRangeSet {
type Target = RangeSetBlaze<ValueSubkey>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl DerefMut for ValueSubkeyRangeSet {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}