[skip ci] refactor checkpoint

This commit is contained in:
Christien Rioux 2025-01-26 14:58:00 -05:00
parent e57d56b00c
commit bd111ac73b
8 changed files with 851 additions and 925 deletions

File diff suppressed because it is too large Load Diff

View File

@ -50,7 +50,7 @@ pub(super) struct RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
table_store: TableStore,
registry: VeilidComponentRegistry,
name: String,
limits: RecordStoreLimits,
@ -129,7 +129,7 @@ impl<D> RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
pub fn new(registry: VeilidComponentRegistry, name: &str, limits: RecordStoreLimits) -> Self {
let subkey_cache_size = limits.subkey_cache_size;
let limit_subkey_cache_total_size = limits
.max_subkey_cache_memory_mb
@ -139,7 +139,7 @@ where
.map(|mb| mb as u64 * 1_048_576u64);
Self {
table_store,
registry,
name: name.to_owned(),
limits,
record_table: None,
@ -165,7 +165,7 @@ where
}
}
pub async fn init(&mut self) -> EyreResult<()> {
pub async fn setup(&mut self) -> EyreResult<()> {
let record_table = self
.table_store
.open(&format!("{}_records", self.name), 1)

View File

@ -1,776 +0,0 @@
use super::*;
const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata";
const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct OfflineSubkeyWrite {
pub safety_selection: SafetySelection,
pub subkeys: ValueSubkeyRangeSet,
#[serde(default)]
pub subkeys_in_flight: ValueSubkeyRangeSet,
}
/// Locked structure for storage manager
pub(super) struct StorageManagerInner {
unlocked_inner: Arc<StorageManagerUnlockedInner>,
/// If we are started up
pub initialized: bool,
/// Records that have been 'opened' and are not yet closed
pub opened_records: HashMap<TypedKey, OpenedRecord>,
/// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive
pub local_record_store: Option<RecordStore<LocalRecordDetail>>,
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// Record subkeys that have not been pushed to the network because they were written to offline
pub offline_subkey_writes: HashMap<TypedKey, OfflineSubkeyWrite>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to
pub update_callback: Option<UpdateCallback>,
/// Deferred result processor
pub deferred_result_processor: DeferredStreamProcessor,
/// The maximum consensus count
set_consensus_count: usize,
}
impl fmt::Debug for StorageManagerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StorageManagerInner")
// .field("unlocked_inner", &self.unlocked_inner)
.field("initialized", &self.initialized)
.field("opened_records", &self.opened_records)
.field("local_record_store", &self.local_record_store)
.field("remote_record_store", &self.remote_record_store)
.field("offline_subkey_writes", &self.offline_subkey_writes)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
//.field("update_callback", &self.update_callback)
.field("deferred_result_processor", &self.deferred_result_processor)
.field("set_consensus_count", &self.set_consensus_count)
.finish()
}
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.local_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: None,
max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize),
max_storage_space_mb: None,
public_watch_limit: c.network.dht.public_watch_limit as usize,
member_watch_limit: c.network.dht.member_watch_limit as usize,
max_watch_expiration: TimestampDuration::new(ms_to_us(
c.network.dht.max_watch_expiration_ms,
)),
min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)),
}
}
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get();
RecordStoreLimits {
subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_total_size: MAX_RECORD_DATA_SIZE,
max_records: Some(c.network.dht.remote_max_records as usize),
max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize),
max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize),
public_watch_limit: c.network.dht.public_watch_limit as usize,
member_watch_limit: c.network.dht.member_watch_limit as usize,
max_watch_expiration: TimestampDuration::new(ms_to_us(
c.network.dht.max_watch_expiration_ms,
)),
min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)),
}
}
impl StorageManagerInner {
pub fn new(unlocked_inner: Arc<StorageManagerUnlockedInner>) -> Self {
let set_consensus_count = unlocked_inner.config.get().network.dht.set_value_count as usize;
Self {
unlocked_inner,
initialized: false,
opened_records: Default::default(),
local_record_store: Default::default(),
remote_record_store: Default::default(),
offline_subkey_writes: Default::default(),
metadata_db: Default::default(),
opt_rpc_processor: Default::default(),
//opt_routing_table: Default::default(),
tick_future: Default::default(),
update_callback: None,
deferred_result_processor: DeferredStreamProcessor::default(),
set_consensus_count,
}
}
pub async fn init(
&mut self,
outer_self: StorageManager,
update_callback: UpdateCallback,
) -> EyreResult<()> {
let metadata_db = self
.unlocked_inner
.table_store
.open(STORAGE_MANAGER_METADATA, 1)
.await?;
let local_limits = local_limits_from_config(self.unlocked_inner.config.clone());
let remote_limits = remote_limits_from_config(self.unlocked_inner.config.clone());
let mut local_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"local",
local_limits,
);
local_record_store.init().await?;
let mut remote_record_store = RecordStore::new(
self.unlocked_inner.table_store.clone(),
"remote",
remote_limits,
);
remote_record_store.init().await?;
self.metadata_db = Some(metadata_db);
self.local_record_store = Some(local_record_store);
self.remote_record_store = Some(remote_record_store);
self.load_metadata().await?;
// Start deferred results processors
self.deferred_result_processor.init().await;
// Schedule tick
let tick_future = interval("storage manager tick", 1000, move || {
let this = outer_self.clone();
async move {
if let Err(e) = this.tick().await {
log_stor!(warn "storage manager tick failed: {}", e);
}
}
});
self.tick_future = Some(tick_future);
self.update_callback = Some(update_callback);
self.initialized = true;
Ok(())
}
pub async fn stop_ticker(&mut self) {
// Stop ticker
let tick_future = self.tick_future.take();
if let Some(f) = tick_future {
f.await;
}
}
pub async fn terminate(&mut self) {
self.update_callback = None;
// Stop deferred result processor
self.deferred_result_processor.terminate().await;
// Final flush on record stores
if let Some(mut local_record_store) = self.local_record_store.take() {
if let Err(e) = local_record_store.flush().await {
log_stor!(error "termination local record store tick failed: {}", e);
}
}
if let Some(mut remote_record_store) = self.remote_record_store.take() {
if let Err(e) = remote_record_store.flush().await {
log_stor!(error "termination remote record store tick failed: {}", e);
}
}
// Save metadata
if self.metadata_db.is_some() {
if let Err(e) = self.save_metadata().await {
log_stor!(error "termination metadata save failed: {}", e);
}
self.metadata_db = None;
}
self.offline_subkey_writes.clear();
// Mark not initialized
self.initialized = false;
}
async fn save_metadata(&mut self) -> EyreResult<()> {
if let Some(metadata_db) = &self.metadata_db {
let tx = metadata_db.transact();
tx.store_json(0, OFFLINE_SUBKEY_WRITES, &self.offline_subkey_writes)?;
tx.commit().await.wrap_err("failed to commit")?
}
Ok(())
}
async fn load_metadata(&mut self) -> EyreResult<()> {
if let Some(metadata_db) = &self.metadata_db {
self.offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await
{
Ok(v) => v.unwrap_or_default(),
Err(_) => {
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
log_stor!(debug "offline_subkey_writes format changed, clearing: {}", e);
}
Default::default()
}
}
}
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn create_new_owned_local_record(
&mut self,
kind: CryptoKind,
schema: DHTSchema,
owner: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<(TypedKey, KeyPair)> {
// Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else {
apibail_generic!("unsupported cryptosystem");
};
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Verify the dht schema does not contain the node id
{
let cfg = self.unlocked_inner.config.get();
if let Some(node_id) = cfg.network.routing_table.node_id.get(kind) {
if schema.is_member(&node_id.value) {
apibail_invalid_argument!(
"node id can not be schema member",
"schema",
node_id.value
);
}
}
}
// Compile the dht schema
let schema_data = schema.compile();
// New values require a new owner key if not given
let owner = owner.unwrap_or_else(|| vcrypto.generate_keypair());
// Calculate dht key
let dht_key = Self::get_key(vcrypto.clone(), &owner.key, &schema_data);
// Make a signed value descriptor for this dht value
let signed_value_descriptor = Arc::new(SignedValueDescriptor::make_signature(
owner.key,
schema_data,
vcrypto.clone(),
owner.secret,
)?);
// Add new local value record
let cur_ts = Timestamp::now();
let local_record_detail = LocalRecordDetail::new(safety_selection);
let record =
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
local_record_store.new_record(dht_key, record).await?;
Ok((dht_key, owner))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn move_remote_record_to_local(
&mut self,
key: TypedKey,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Get remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
let rcb = |r: &Record<RemoteRecordDetail>| {
// Return record details
r.clone()
};
let Some(remote_record) = remote_record_store.with_record(key, rcb) else {
// No local or remote record found, return None
return Ok(None);
};
// Make local record
let cur_ts = Timestamp::now();
let local_record = Record::new(
cur_ts,
remote_record.descriptor().clone(),
LocalRecordDetail::new(safety_selection),
)?;
local_record_store.new_record(key, local_record).await?;
// Move copy subkey data from remote to local store
for subkey in remote_record.stored_subkeys().iter() {
let Some(get_result) = remote_record_store.get_subkey(key, subkey, false).await? else {
// Subkey was missing
warn!("Subkey was missing: {} #{}", key, subkey);
continue;
};
let Some(subkey_data) = get_result.opt_value else {
// Subkey was missing
warn!("Subkey data was missing: {} #{}", key, subkey);
continue;
};
local_record_store
.set_subkey(key, subkey, subkey_data, WatchUpdateMode::NoUpdate)
.await?;
}
// Move watches
local_record_store.move_watches(key, remote_record_store.move_watches(key, None));
// Delete remote record from store
remote_record_store.delete_record(key).await?;
// Return record information as transferred to local record
Ok(Some((*remote_record.owner(), remote_record.schema())))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_existing_record(
&mut self,
key: TypedKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<Option<DHTRecordDescriptor>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// See if we have a local record already or not
let cb = |r: &mut Record<LocalRecordDetail>| {
// Process local record
// Keep the safety selection we opened the record with
r.detail_mut().safety_selection = safety_selection;
// Return record details
(*r.owner(), r.schema())
};
let (owner, schema) = match local_record_store.with_record_mut(key, cb) {
Some(v) => v,
None => {
// If we don't have a local record yet, check to see if we have a remote record
// if so, migrate it to a local record
let Some(v) = self
.move_remote_record_to_local(key, safety_selection)
.await?
else {
// No remote record either
return Ok(None);
};
v
}
};
// Had local record
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = writer {
if writer.key == owner {
Some(writer.secret)
} else {
None
}
} else {
None
};
// Write open record
self.opened_records
.entry(key)
.and_modify(|e| {
e.set_writer(writer);
e.set_safety_selection(safety_selection);
})
.or_insert_with(|| OpenedRecord::new(writer, safety_selection));
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
Ok(Some(descriptor))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn open_new_record(
&mut self,
key: TypedKey,
writer: Option<KeyPair>,
subkey: ValueSubkey,
get_result: GetResult,
safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> {
// Ensure the record is closed
if self.opened_records.contains_key(&key) {
panic!("new record should never be opened at this point");
}
// Must have descriptor
let Some(signed_value_descriptor) = get_result.opt_descriptor else {
// No descriptor for new record, can't store this
apibail_generic!("no descriptor");
};
// Get owner
let owner = *signed_value_descriptor.owner();
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = writer {
if writer.key == owner {
Some(writer.secret)
} else {
None
}
} else {
None
};
let schema = signed_value_descriptor.schema()?;
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Make and store a new record for this descriptor
let record = Record::<LocalRecordDetail>::new(
Timestamp::now(),
signed_value_descriptor,
LocalRecordDetail::new(safety_selection),
)?;
local_record_store.new_record(key, record).await?;
// If we got a subkey with the getvalue, it has already been validated against the schema, so store it
if let Some(signed_value_data) = get_result.opt_value {
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate)
.await?;
}
// Write open record
self.opened_records
.insert(key, OpenedRecord::new(writer, safety_selection));
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
Ok(descriptor)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_ref() else {
apibail_not_initialized!();
};
// Get routing table to see if we still know about these nodes
let Some(routing_table) = self.opt_rpc_processor.as_ref().map(|r| r.routing_table()) else {
apibail_try_again!("offline, try again later");
};
let opt_value_nodes = local_record_store.peek_record(key, |r| {
let d = r.detail();
d.nodes
.keys()
.copied()
.filter_map(|x| {
routing_table
.lookup_node_ref(TypedKey::new(key.kind, x))
.ok()
.flatten()
})
.collect()
});
Ok(opt_value_nodes)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn process_fanout_results<
'a,
I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>,
>(
&mut self,
key: TypedKey,
subkey_results_iter: I,
is_set: bool,
) {
// Get local record store
let local_record_store = self.local_record_store.as_mut().unwrap();
let cur_ts = Timestamp::now();
local_record_store.with_record_mut(key, |r| {
let d = r.detail_mut();
for (subkey, fanout_result) in subkey_results_iter {
for node_id in fanout_result
.value_nodes
.iter()
.filter_map(|x| x.node_ids().get(key.kind).map(|k| k.value))
{
let pnd = d.nodes.entry(node_id).or_default();
if is_set || pnd.last_set == Timestamp::default() {
pnd.last_set = cur_ts;
}
pnd.last_seen = cur_ts;
pnd.subkeys.insert(subkey);
}
}
// Purge nodes down to the N most recently seen, where N is the consensus count for a set operation
let mut nodes_ts = d
.nodes
.iter()
.map(|kv| (*kv.0, kv.1.last_seen))
.collect::<Vec<_>>();
nodes_ts.sort_by(|a, b| b.1.cmp(&a.1));
for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) {
d.nodes.remove(&dead_node_key.0);
}
});
}
pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<Option<OpenedRecord>> {
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if local_record_store.peek_record(key, |_| {}).is_none() {
return Err(VeilidAPIError::key_not_found(key));
}
Ok(self.opened_records.remove(&key))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = local_record_store
.get_subkey(key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
}
Ok(GetResult {
opt_value: None,
opt_descriptor: None,
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: WatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data, watch_update_mode)
.await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = local_record_store
.inspect_record(key, subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
Ok(InspectResult {
subkeys: ValueSubkeyRangeSet::new(),
seqs: vec![],
opt_descriptor: None,
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = remote_record_store
.get_subkey(key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
}
Ok(GetResult {
opt_value: None,
opt_descriptor: None,
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
signed_value_descriptor: Arc<SignedValueDescriptor>,
watch_update_mode: WatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
// See if we have a remote record already or not
if remote_record_store.with_record(key, |_| {}).is_none() {
// record didn't exist, make it
let cur_ts = Timestamp::now();
let remote_record_detail = RemoteRecordDetail {};
let record = Record::<RemoteRecordDetail>::new(
cur_ts,
signed_value_descriptor,
remote_record_detail,
)?;
remote_record_store.new_record(key, record).await?
};
// Write subkey to remote store
remote_record_store
.set_subkey(key, subkey, signed_value_data, watch_update_mode)
.await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = remote_record_store
.inspect_record(key, subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
Ok(InspectResult {
subkeys: ValueSubkeyRangeSet::new(),
seqs: vec![],
opt_descriptor: None,
})
}
pub async fn get_record_key(
&self,
kind: CryptoKind,
owner_key: &PublicKey,
schema: DHTSchema,
) -> VeilidAPIResult<TypedKey> {
// Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else {
apibail_generic!("unsupported cryptosystem");
};
Ok(Self::get_key(vcrypto, owner_key, &schema.compile()))
}
fn get_key(
vcrypto: CryptoSystemVersion,
owner_key: &PublicKey,
schema_data: &[u8],
) -> TypedKey {
let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + schema_data.len());
hash_data.extend_from_slice(&vcrypto.kind().0);
hash_data.extend_from_slice(&owner_key.bytes);
hash_data.extend_from_slice(schema_data);
let hash = vcrypto.generate_hash(&hash_data);
TypedKey::new(vcrypto.kind(), hash)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn add_offline_subkey_write(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) {
self.offline_subkey_writes
.entry(key)
.and_modify(|x| {
x.subkeys.insert(subkey);
})
.or_insert(OfflineSubkeyWrite {
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey),
subkeys_in_flight: ValueSubkeyRangeSet::new(),
});
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub fn process_deferred_results<T: Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,
handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
self.deferred_result_processor
.add(receiver.into_stream(), handler)
}
}

View File

@ -12,100 +12,84 @@ impl StorageManager {
log_stor!(debug "starting flush record stores task");
{
let this = self.clone();
self.unlocked_inner
.flush_record_stores_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().flush_record_stores_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
self.flush_record_stores_task.set_routine(move |s, l, t| {
Box::pin(this.clone().flush_record_stores_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set offline subkey writes tick task
log_stor!(debug "starting offline subkey writes task");
{
let this = self.clone();
self.unlocked_inner
.offline_subkey_writes_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().offline_subkey_writes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
self.offline_subkey_writes_task.set_routine(move |s, l, t| {
Box::pin(this.clone().offline_subkey_writes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set send value changes tick task
log_stor!(debug "starting send value changes task");
{
let this = self.clone();
self.unlocked_inner
.send_value_changes_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().send_value_changes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
self.send_value_changes_task.set_routine(move |s, l, t| {
Box::pin(this.clone().send_value_changes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set check active watches tick task
log_stor!(debug "starting check active watches task");
{
let this = self.clone();
self.unlocked_inner
.check_active_watches_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().check_active_watches_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
self.check_active_watches_task.set_routine(move |s, l, t| {
Box::pin(this.clone().check_active_watches_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
// Set check watched records tick task
log_stor!(debug "starting checked watched records task");
{
let this = self.clone();
self.unlocked_inner
.check_watched_records_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().check_watched_records_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
self.check_watched_records_task.set_routine(move |s, l, t| {
Box::pin(this.clone().check_watched_records_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
}
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> {
// Run the flush stores task
self.unlocked_inner.flush_record_stores_task.tick().await?;
self.flush_record_stores_task.tick().await?;
// Check active watches
self.unlocked_inner.check_active_watches_task.tick().await?;
self.check_active_watches_task.tick().await?;
// Check watched records
self.unlocked_inner
.check_watched_records_task
.tick()
.await?;
self.check_watched_records_task.tick().await?;
// Run online-only tasks
if self.online_writes_ready().await?.is_some() {
if self.online_writes_ready() {
// Run offline subkey writes task if there's work to be done
if self.has_offline_subkey_writes().await? {
self.unlocked_inner
.offline_subkey_writes_task
.tick()
.await?;
if self.has_offline_subkey_writes().await {
self.offline_subkey_writes_task.tick().await?;
}
// Send value changed notifications
self.unlocked_inner.send_value_changes_task.tick().await?;
self.send_value_changes_task.tick().await?;
}
Ok(())
}
@ -113,23 +97,23 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) async fn cancel_tasks(&self) {
log_stor!(debug "stopping check watched records task");
if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await {
if let Err(e) = self.check_watched_records_task.stop().await {
warn!("check_watched_records_task not stopped: {}", e);
}
log_stor!(debug "stopping check active watches task");
if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await {
if let Err(e) = self.check_active_watches_task.stop().await {
warn!("check_active_watches_task not stopped: {}", e);
}
log_stor!(debug "stopping send value changes task");
if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await {
if let Err(e) = self.send_value_changes_task.stop().await {
warn!("send_value_changes_task not stopped: {}", e);
}
log_stor!(debug "stopping flush record stores task");
if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await {
if let Err(e) = self.flush_record_stores_task.stop().await {
warn!("flush_record_stores_task not stopped: {}", e);
}
log_stor!(debug "stopping offline subkey writes task");
if let Err(e) = self.unlocked_inner.offline_subkey_writes_task.stop().await {
if let Err(e) = self.offline_subkey_writes_task.stop().await {
warn!("offline_subkey_writes_task not stopped: {}", e);
}
}

View File

@ -2,6 +2,14 @@ use super::*;
use futures_util::*;
use stop_token::future::FutureExt as _;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OfflineSubkeyWrite {
pub safety_selection: SafetySelection,
pub subkeys: ValueSubkeyRangeSet,
#[serde(default)]
pub subkeys_in_flight: ValueSubkeyRangeSet,
}
#[derive(Debug)]
enum OfflineSubkeyWriteResult {
Finished(set_value::OutboundSetValueResult),

View File

@ -19,7 +19,7 @@ impl SignedValueData {
&self,
owner: &PublicKey,
subkey: ValueSubkey,
vcrypto: CryptoSystemVersion,
vcrypto: &CryptoSystemGuard<'_>,
) -> VeilidAPIResult<bool> {
let node_info_bytes = Self::make_signature_bytes(&self.value_data, owner, subkey)?;
// validate signature
@ -30,7 +30,7 @@ impl SignedValueData {
value_data: ValueData,
owner: &PublicKey,
subkey: ValueSubkey,
vcrypto: CryptoSystemVersion,
vcrypto: &CryptoSystemGuard<'_>,
writer_secret: SecretKey,
) -> VeilidAPIResult<Self> {
let node_info_bytes = Self::make_signature_bytes(&value_data, owner, subkey)?;

View File

@ -17,7 +17,7 @@ impl SignedValueDescriptor {
}
}
pub fn validate(&self, vcrypto: CryptoSystemVersion) -> VeilidAPIResult<()> {
pub fn validate(&self, vcrypto: &CryptoSystemGuard<'_>) -> VeilidAPIResult<()> {
// validate signature
if !vcrypto.verify(&self.owner, &self.schema_data, &self.signature)? {
apibail_parse_error!(
@ -49,7 +49,7 @@ impl SignedValueDescriptor {
pub fn make_signature(
owner: PublicKey,
schema_data: Vec<u8>,
vcrypto: CryptoSystemVersion,
vcrypto: &CryptoSystemGuard<'_>,
owner_secret: SecretKey,
) -> VeilidAPIResult<Self> {
// create signature

View File

@ -267,7 +267,8 @@ impl TableStore {
// Get cryptosystem
let kind = FourCC::try_from(&dek_bytes[0..4]).unwrap();
let Some(vcrypto) = self.crypto().get(kind) else {
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(kind) else {
bail!("unsupported cryptosystem '{kind}'");
};
@ -322,7 +323,8 @@ impl TableStore {
}
// Get cryptosystem
let Some(vcrypto) = self.crypto().get(dek.kind) else {
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(dek.kind) else {
bail!("unsupported cryptosystem '{}'", dek.kind);
};