checkpoint

This commit is contained in:
John Smith 2023-05-07 21:56:35 -04:00
parent 1fe5004eef
commit 76b8d569dc
10 changed files with 518 additions and 308 deletions

View File

@ -0,0 +1,103 @@
use super::*;
impl RoutingTable {
/// Utility to find all closest nodes to a particular key, including possibly our own node and nodes further away from the key than our own, returning their peer info
pub fn find_all_closest_peers(&self, key: TypedKey) -> NetworkResult<Vec<PeerInfo>> {
let Some(own_peer_info) = self.get_own_peer_info(RoutingDomain::PublicInternet) else {
// Our own node info is not yet available, drop this request.
return NetworkResult::service_unavailable();
};
// find N nodes closest to the target node in our routing table
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
let closest_nodes = self.find_closest_nodes(
node_count,
key,
filters,
// transform
|rti, entry| {
rti.transform_to_peer_info(RoutingDomain::PublicInternet, &own_peer_info, entry)
},
);
NetworkResult::value(closest_nodes)
}
/// Utility to find nodes that are closer to a key than our own node, returning their peer info
pub fn find_peers_closer_to_key(&self, key: TypedKey) -> NetworkResult<Vec<PeerInfo>> {
// add node information for the requesting node to our routing table
let crypto_kind = key.kind;
let own_node_id = self.node_id(crypto_kind);
// find N nodes closest to the target node in our routing table
// ensure the nodes returned are only the ones closer to the target node than ourself
let Some(vcrypto) = self.crypto().get(crypto_kind) else {
return NetworkResult::invalid_message("unsupported cryptosystem");
};
let own_distance = vcrypto.distance(&own_node_id.value, &key.value);
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
let Some(entry) = opt_entry else {
return false;
};
// Ensure only things that are valid/signed in the PublicInternet domain are returned
if !rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
Some(entry.clone()),
) {
return false;
}
// Ensure things further from the key than our own node are not included
let Some(entry_node_id) = entry.with(rti, |_rti, e| e.node_ids().get(crypto_kind)) else {
return false;
};
let entry_distance = vcrypto.distance(&entry_node_id.value, &key.value);
if entry_distance >= own_distance {
return false;
}
true
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
//
let closest_nodes = self.find_closest_nodes(
node_count,
key,
filters,
// transform
|rti, entry| {
entry.unwrap().with(rti, |_rti, e| {
e.make_peer_info(RoutingDomain::PublicInternet).unwrap()
})
},
);
NetworkResult::value(closest_nodes)
}
}

View File

@ -1,6 +1,7 @@
mod bucket;
mod bucket_entry;
mod debug;
mod find_peers;
mod node_ref;
mod node_ref_filter;
mod privacy;
@ -22,6 +23,7 @@ use hashlink::LruCache;
pub use bucket_entry::*;
pub use debug::*;
pub use find_peers::*;
pub use node_ref::*;
pub use node_ref_filter::*;
pub use privacy::*;

View File

@ -90,48 +90,14 @@ impl RPCProcessor {
};
let node_id = find_node_q.destructure();
// add node information for the requesting node to our routing table
// Get a chunk of the routing table near the requested node id
let routing_table = self.routing_table();
let closest_nodes = network_result_try!(routing_table.find_all_closest_peers(node_id));
xxx move this into routing table code, also do getvalue code
let Some(own_peer_info) = routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else {
// Our own node info is not yet available, drop this request.
return Ok(NetworkResult::service_unavailable());
};
// find N nodes closest to the target node in our routing table
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
let closest_nodes = routing_table.find_closest_nodes(
node_count,
node_id,
filters,
// transform
|rti, entry| {
rti.transform_to_peer_info(RoutingDomain::PublicInternet, &own_peer_info, entry)
},
);
// Make status answer
// Make FindNode answer
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;
// Send status answer
// Send FindNode answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
.await
}

View File

@ -113,69 +113,26 @@ impl RPCProcessor {
// Destructure
let (key, subkey, want_descriptor) = get_value_q.destructure();
// add node information for the requesting node to our routing table
let crypto_kind = key.kind;
// Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table();
let own_node_id = routing_table.node_id(crypto_kind);
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key));
// find N nodes closest to the target node in our routing table
// ensure the nodes returned are only the ones closer to the target node than ourself
let Some(vcrypto) = self.crypto.get(crypto_kind) else {
return Ok(NetworkResult::invalid_message("unsupported cryptosystem"));
};
let own_distance = vcrypto.distance(&own_node_id.value, &key.value);
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let subkey_result = storage_manager
.handle_get_value(key, subkey, want_descriptor)
.await
.map_err(RPCError::internal)?;
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
let Some(entry) = opt_entry else {
return false;
};
// Ensure only things that are valid/signed in the PublicInternet domain are returned
if !rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
Some(entry.clone()),
) {
return false;
}
// Ensure things further from the key than our own node are not included
let Some(entry_node_id) = entry.with(rti, |_rti, e| e.node_ids().get(crypto_kind)) else {
return false;
};
let entry_distance = vcrypto.distance(&entry_node_id.value, &key.value);
if entry_distance >= own_distance {
return false;
}
// Make GetValue answer
let get_value_a = RPCOperationGetValueA::new(
subkey_result.value,
closer_to_key_peers,
subkey_result.descriptor,
)?;
true
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
//
let closest_nodes = routing_table.find_closest_nodes(
node_count,
key,
filters,
// transform
|rti, entry| {
entry.unwrap().with(rti, |_rti, e| {
e.make_peer_info(RoutingDomain::PublicInternet).unwrap()
})
},
);
// Make status answer
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;
// Send status answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
// Send GetValue answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::GetValueA(get_value_a)))
.await
}
}

View File

@ -1,13 +1,5 @@
use super::*;
/// The result of the do_get_value_operation
pub struct DoGetValueResult {
/// The subkey value if we got one
pub value: Option<SignedValueData>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<SignedValueDescriptor>,
}
/// The context of the do_get_value operation
struct DoGetValueContext {
/// The latest value of the subkey, may be the value passed in
@ -22,6 +14,7 @@ struct DoGetValueContext {
impl StorageManager {
/// Perform a 'get value' query on the network
pub async fn do_get_value(
&self,
rpc_processor: RPCProcessor,
@ -30,7 +23,7 @@ impl StorageManager {
last_value: Option<SignedValueData>,
last_descriptor: Option<SignedValueDescriptor>,
safety_selection: SafetySelection,
) -> Result<DoGetValueResult, VeilidAPIError> {
) -> Result<SubkeyResult, VeilidAPIError> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue'
@ -63,7 +56,7 @@ impl StorageManager {
let context = context.clone();
let last_descriptor = last_descriptor.clone();
async move {
match rpc_processor
let vres = rpc_processor
.clone()
.rpc_call_get_value(
Destination::direct(next_node).with_safety(safety_selection),
@ -71,75 +64,70 @@ impl StorageManager {
subkey,
last_descriptor,
)
.await
{
Ok(v) => {
let v = network_result_value_or_log!(v => {
// Any other failures, just try the next node
return Ok(None);
});
.await?;
let gva = network_result_value_or_log!(vres => {
// Any other failures, just try the next node
return Ok(None);
});
// Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value
if let Some(descriptor) = v.answer.descriptor {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema =
Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(descriptor);
}
}
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = v.answer.value {
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(None);
};
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(None);
}
// If we have a prior value, see if this is a newer sequence number
if let Some(prior_value) = &ctx.value {
let prior_seq = prior_value.value_data().seq();
let new_seq = value.value_data().seq();
if new_seq == prior_seq {
// If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() {
// Move to the next node
return Ok(None);
}
// Increase the consensus count for the existing value
ctx.value_count += 1;
} else if new_seq > prior_seq {
// If the sequence number is greater, go with it
ctx.value = Some(value);
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
}
}
// Return peers if we have some
Ok(Some(v.answer.peers))
// Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value
if let Some(descriptor) = gva.answer.descriptor {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema =
Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(descriptor);
}
Err(e) => Err(e),
}
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(None);
};
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(None);
}
// If we have a prior value, see if this is a newer sequence number
if let Some(prior_value) = &ctx.value {
let prior_seq = prior_value.value_data().seq();
let new_seq = value.value_data().seq();
if new_seq == prior_seq {
// If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() {
// Move to the next node
return Ok(None);
}
// Increase the consensus count for the existing value
ctx.value_count += 1;
} else if new_seq > prior_seq {
// If the sequence number is greater, go with it
ctx.value = Some(value);
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
}
}
// Return peers if we have some
Ok(Some(gva.answer.peers))
}
};
@ -173,7 +161,7 @@ impl StorageManager {
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
Ok(DoGetValueResult{
Ok(SubkeyResult{
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
@ -185,4 +173,10 @@ impl StorageManager {
}
}
}
/// Handle a recieved 'Get Value' query
pub async fn handle_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result<SubkeyResult, VeilidAPIError> {
let mut inner = self.lock().await?;
inner.handle_get_remote_value(key, subkey, want_descriptor)
}
}

View File

@ -2,12 +2,14 @@ mod do_get_value;
mod keys;
mod record_store;
mod record_store_limits;
mod storage_manager_inner;
mod tasks;
mod types;
use keys::*;
use record_store::*;
use record_store_limits::*;
use storage_manager_inner::*;
pub use types::*;
@ -21,22 +23,6 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
/// Frequency to flush record stores to disk
const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
/// Locked structure for storage manager
struct StorageManagerInner {
/// If we are started up
initialized: bool,
/// Records that have been 'opened' and are not yet closed
opened_records: HashMap<TypedKey, OpenedRecord>,
/// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive
local_record_store: Option<RecordStore<LocalRecordDetail>>,
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// RPC processor if it is available
rpc_processor: Option<RPCProcessor>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
tick_future: Option<SendPinBoxFuture<()>>,
}
struct StorageManagerUnlockedInner {
config: VeilidConfig,
crypto: Crypto,
@ -72,14 +58,7 @@ impl StorageManager {
}
}
fn new_inner() -> StorageManagerInner {
StorageManagerInner {
initialized: false,
opened_records: HashMap::new(),
local_record_store: None,
remote_record_store: None,
rpc_processor: None,
tick_future: None,
}
StorageManagerInner::default()
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
@ -266,63 +245,16 @@ impl StorageManager {
local_record_store.new_record(dht_key, record).await?;
// Open the record
self.open_record_inner(inner, dht_key, Some(owner), safety_selection)
self.open_record_common(inner, dht_key, Some(owner), safety_selection)
.await
}
fn open_record_inner_check_existing(
async fn open_record_common(
&self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> Option<Result<DHTRecordDescriptor, VeilidAPIError>> {
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
return Some(Err(VeilidAPIError::not_initialized()));
};
// See if we have a local record already or not
let cb = |r: &mut Record<LocalRecordDetail>| {
// Process local record
// Keep the safety selection we opened the record with
r.detail_mut().safety_selection = safety_selection;
// Return record details
(r.owner().clone(), r.schema())
};
let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else {
return None;
};
// Had local record
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = writer {
if writer.key == owner {
Some(writer.secret)
} else {
None
}
} else {
None
};
// Write open record
inner.opened_records.insert(key, OpenedRecord::new(writer));
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
Some(Ok(descriptor))
}
async fn open_record_inner(
&self,
inner: AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> Result<DHTRecordDescriptor, VeilidAPIError> {
// Ensure the record is closed
if inner.opened_records.contains_key(&key) {
@ -330,25 +262,23 @@ impl StorageManager {
}
// See if we have a local record already or not
if let Some(res) =
self.open_record_inner_check_existing(inner, key, writer, safety_selection)
{
if let Some(res) = inner.open_record_check_existing(key, writer, safety_selection) {
return res;
}
// No record yet, try to get it from the network
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let rpc_processor = {
let inner = self.lock().await?;
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later
apibail_try_again!();
};
rpc_processor
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later
apibail_try_again!();
};
// Drop the mutex so we dont block during network access
drop(inner);
// No last descriptor, no last value
// Use the safety selection we opened the record with
let subkey: ValueSubkey = 0;
let result = self
.do_get_value(rpc_processor, key, subkey, None, None, safety_selection)
@ -405,7 +335,9 @@ impl StorageManager {
}
// Write open record
inner.opened_records.insert(key, OpenedRecord::new(writer));
inner
.opened_records
.insert(key, OpenedRecord::new(writer, safety_selection));
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
@ -419,24 +351,13 @@ impl StorageManager {
safety_selection: SafetySelection,
) -> Result<DHTRecordDescriptor, VeilidAPIError> {
let inner = self.lock().await?;
self.open_record_inner(inner, key, writer, safety_selection)
self.open_record_common(inner, key, writer, safety_selection)
.await
}
async fn close_record_inner(
&self,
inner: &mut AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
) -> Result<(), VeilidAPIError> {
let Some(_opened_record) = inner.opened_records.remove(&key) else {
apibail_generic!("record not open");
};
Ok(())
}
pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
let mut inner = self.lock().await?;
self.close_record_inner(&mut inner, key).await
inner.close_record(key)
}
pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
@ -444,7 +365,7 @@ impl StorageManager {
// Ensure the record is closed
if inner.opened_records.contains_key(&key) {
self.close_record_inner(&mut inner, key).await?;
inner.close_record(key)?;
}
let Some(local_record_store) = inner.local_record_store.as_mut() else {
@ -461,8 +382,60 @@ impl StorageManager {
subkey: ValueSubkey,
force_refresh: bool,
) -> Result<Option<ValueData>, VeilidAPIError> {
let inner = self.lock().await?;
unimplemented!();
let mut inner = self.lock().await?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(opened_record) = inner.opened_records.remove(&key) else {
apibail_generic!("record not open");
};
// See if the requested subkey is our local record store
let SubkeyResult { value, descriptor } = inner.handle_get_local_value(key, subkey, true)?;
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
if let Some(value) = value {
return Ok(Some(value.into_value_data()));
}
}
// Refresh if we can
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later
apibail_try_again!();
};
// Drop the lock for network access
drop(inner);
// May have last descriptor / value
// Use the safety selection we opened the record with
let opt_last_seq = value.as_ref().map(|v| v.value_data().seq());
let result = self
.do_get_value(
rpc_processor,
key,
subkey,
value,
descriptor,
opened_record.safety_selection(),
)
.await?;
// See if we got a value back
let Some(result_value) = result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// If we got a new value back then write it to the opened record
if Some(result_value.value_data().seq()) != opt_last_seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, result_value.clone())
.await?;
}
Ok(Some(result_value.into_value_data()))
}
pub async fn set_value(

View File

@ -30,6 +30,14 @@ where
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
}
/// The result of the do_get_value_operation
pub struct SubkeyResult {
/// The subkey value if we got one
pub value: Option<SignedValueData>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<SignedValueDescriptor>,
}
impl<D> RecordStore<D>
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
@ -297,7 +305,7 @@ where
Ok(())
}
pub fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
pub(super) fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
@ -318,7 +326,7 @@ where
out
}
pub fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
pub(super) fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
{
@ -339,24 +347,27 @@ where
out
}
pub async fn get_subkey(
// pub fn get_descriptor(&mut self, key: TypedKey) -> Option<SignedValueDescriptor> {
// self.with_record(key, |record| record.descriptor().clone())
// }
pub fn get_subkey(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
) -> Result<Option<SignedValueData>, VeilidAPIError> {
want_descriptor: bool,
) -> Result<Option<SubkeyResult>, VeilidAPIError> {
// record from index
let rtk = RecordTableKey { key };
let subkey_count = {
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
// Touch
record.touch(get_aligned_timestamp());
record.subkey_count()
let Some((subkey_count, opt_descriptor)) = self.with_record(key, |record| {
(record.subkey_count(), if want_descriptor {
Some(record.descriptor().clone())
} else {
None
})
}) else {
// Record not available
return Ok(None);
};
self.mark_record_changed(rtk);
// Check if the subkey is in range
if subkey as usize >= subkey_count {
@ -373,7 +384,10 @@ where
if let Some(record_data) = self.subkey_cache.get_mut(&stk) {
let out = record_data.signed_value_data().clone();
return Ok(Some(out));
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}));
}
// If not in cache, try to pull from table store
if let Some(record_data) = subkey_table
@ -385,10 +399,17 @@ where
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
return Ok(Some(out));
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}));
};
return Ok(None);
// Record was available, but subkey was not found, maybe descriptor gets returned
Ok(Some(SubkeyResult {
value: None,
descriptor: opt_descriptor,
}))
}
pub async fn set_subkey(
@ -403,18 +424,11 @@ where
}
// Get record from index
let rtk = RecordTableKey { key };
let (subkey_count, total_size) = {
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
// Touch
record.touch(get_aligned_timestamp());
let Some((subkey_count, total_size)) = self.with_record(key, |record| {
(record.subkey_count(), record.total_size())
}) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
self.mark_record_changed(rtk);
// Check if the subkey is in range
if subkey as usize >= subkey_count {
@ -474,10 +488,10 @@ where
self.add_to_subkey_cache(stk, record_data);
// Update record
let Some(record) = self.record_index.get_mut(&rtk) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
record.set_record_data_size(new_record_data_size);
self.with_record_mut(key, |record| {
record.set_record_data_size(new_record_data_size);
})
.expect("record should still be here");
Ok(())
}

View File

@ -0,0 +1,187 @@
use super::*;
/// Locked structure for storage manager
#[derive(Default)]
pub(super) struct StorageManagerInner {
/// If we are started up
pub initialized: bool,
/// Records that have been 'opened' and are not yet closed
pub opened_records: HashMap<TypedKey, OpenedRecord>,
/// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive
pub local_record_store: Option<RecordStore<LocalRecordDetail>>,
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// RPC processor if it is available
pub rpc_processor: Option<RPCProcessor>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>,
}
impl StorageManagerInner {
pub fn open_record_check_existing(
&mut self,
key: TypedKey,
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> Option<Result<DHTRecordDescriptor, VeilidAPIError>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
return Some(Err(VeilidAPIError::not_initialized()));
};
// See if we have a local record already or not
let cb = |r: &mut Record<LocalRecordDetail>| {
// Process local record
// Keep the safety selection we opened the record with
r.detail_mut().safety_selection = safety_selection;
// Return record details
(r.owner().clone(), r.schema())
};
let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else {
return None;
};
// Had local record
// If the writer we chose is also the owner, we have the owner secret
// Otherwise this is just another subkey writer
let owner_secret = if let Some(writer) = writer {
if writer.key == owner {
Some(writer.secret)
} else {
None
}
} else {
None
};
// Write open record
self.opened_records
.insert(key, OpenedRecord::new(writer, safety_selection));
// Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
Some(Ok(descriptor))
}
pub async fn new_local_record(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_descriptor: SignedValueDescriptor,
signed_value_data: Option<SignedValueData>,
safety_selection: SafetySelection,
) -> Result<(), VeilidAPIError> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Make and store a new record for this descriptor
let record = Record::<LocalRecordDetail>::new(
get_aligned_timestamp(),
signed_value_descriptor,
LocalRecordDetail { safety_selection },
)?;
local_record_store.new_record(key, record).await?;
// If we got a subkey with the getvalue, it has already been validated against the schema, so store it
if let Some(signed_value_data) = signed_value_data {
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data)
.await?;
}
// Write open record
self.opened_records
.insert(key, OpenedRecord::new(writer, safety_selection));
Ok(())
}
pub fn close_record(&mut self, key: TypedKey) -> Result<(), VeilidAPIError> {
let Some(_opened_record) = self.opened_records.remove(&key) else {
apibail_generic!("record not open");
};
Ok(())
}
pub fn handle_get_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> Result<SubkeyResult, VeilidAPIError> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(subkey_result) = local_record_store.get_subkey(key, subkey, want_descriptor)? {
return Ok(subkey_result);
}
Ok(SubkeyResult {
value: None,
descriptor: None,
})
}
pub async fn handle_set_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
) -> Result<(), VeilidAPIError> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Write subkey to local store
local_record_store
.set_subkey(key, subkey, signed_value_data)
.await?;
Ok(())
}
pub fn handle_get_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> Result<SubkeyResult, VeilidAPIError> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, want_descriptor)? {
return Ok(subkey_result);
}
Ok(SubkeyResult {
value: None,
descriptor: None,
})
}
pub async fn handle_set_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
) -> Result<(), VeilidAPIError> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
// Write subkey to remote store
remote_record_store
.set_subkey(key, subkey, signed_value_data)
.await?;
Ok(())
}
}

View File

@ -8,14 +8,24 @@ pub struct OpenedRecord {
/// Without this, set_value() will fail regardless of which key or subkey is being written to
/// as all writes are signed
writer: Option<KeyPair>,
/// The safety selection in current use
safety_selection: SafetySelection,
}
impl OpenedRecord {
pub fn new(writer: Option<KeyPair>) -> Self {
Self { writer }
pub fn new(writer: Option<KeyPair>, safety_selection: SafetySelection) -> Self {
Self {
writer,
safety_selection,
}
}
pub fn writer(&self) -> Option<&KeyPair> {
self.writer.as_ref()
}
pub fn safety_selection(&self) -> SafetySelection {
self.safety_selection
}
}

View File

@ -61,6 +61,10 @@ impl SignedValueData {
&self.value_data
}
pub fn into_value_data(self) -> ValueData {
self.value_data
}
pub fn signature(&self) -> &Signature {
&self.signature
}