refactor, and inbound transactions

This commit is contained in:
Christien Rioux 2025-10-27 20:08:40 -04:00
parent 871fa55d1a
commit 56668ba7a6
35 changed files with 812 additions and 695 deletions

View file

@ -64,7 +64,6 @@ pub enum QuestionContext {
pub struct RPCValidateContext<'a> {
pub registry: VeilidComponentRegistry,
pub question_context: Option<&'a QuestionContext>,
pub message_header: &'a MessageHeader,
}
impl_veilid_component_registry_accessor!(RPCValidateContext<'_>);

View file

@ -159,6 +159,7 @@ impl RPCOperationTransactCommandQ {
Ok(())
}
#[expect(clippy::type_complexity)]
pub fn destructure(
self,
) -> (

View file

@ -24,7 +24,7 @@ fn decode_value_data(
None
};
EncryptedValueData::new_with_seq(seq, data, writer, nonce).map_err(RPCError::protocol)
EncryptedValueData::new(seq, data, writer, nonce).map_err(RPCError::protocol)
}
pub fn decode_signed_value_data(
@ -79,7 +79,7 @@ mod tests {
let mut builder = message_builder.init_root::<veilid_capnp::signed_value_data::Builder>();
let signed_value_data = SignedValueData::new(
EncryptedValueData::new_with_seq(
EncryptedValueData::new(
10.into(),
vec![1, 2, 3, 4, 5, 6],
keypair.key(),

View file

@ -86,6 +86,7 @@ impl MessageHeader {
// // XXX: or an actual safety route. If your code depends on this idea, you need to rethink it.
// }
#[expect(dead_code)]
pub fn is_direct(&self) -> bool {
match &self.detail {
RPCMessageHeaderDetail::Direct(_) => true,
@ -101,6 +102,8 @@ impl MessageHeader {
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_id(),
}
}
#[expect(dead_code)]
pub fn direct_sender_public_key(&self) -> PublicKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d
@ -128,6 +131,8 @@ impl MessageHeader {
RPCMessageHeaderDetail::PrivateRouted(p) => Some(p.private_route.clone()),
}
}
#[expect(dead_code)]
pub fn get_safety_route_public_key(&self) -> Option<PublicKey> {
match &self.detail {
RPCMessageHeaderDetail::Direct(_) => None,

View file

@ -1425,7 +1425,7 @@ impl RPCProcessor {
let operation = signed_operation.decode_operation(&decode_context)?;
// Validate the operation
self.validate_rpc_operation(&operation, encoded_msg)?;
self.validate_rpc_operation(&operation)?;
Ok((operation, opt_signer))
}
@ -1467,11 +1467,7 @@ impl RPCProcessor {
/// complex operations that require stateful validation and a more robust context than
/// 'signatures', the caller must still perform whatever validation is necessary
#[instrument(level = "trace", target = "rpc", skip_all)]
fn validate_rpc_operation(
&self,
operation: &RPCOperation,
encoded_msg: &MessageEncoded,
) -> Result<(), RPCError> {
fn validate_rpc_operation(&self, operation: &RPCOperation) -> Result<(), RPCError> {
// If this is an answer, get the question context for this answer
// If we received an answer for a question we did not ask, this will return an error
let question_context = if let RPCOperationKind::Answer(_) = operation.kind() {
@ -1485,7 +1481,6 @@ impl RPCProcessor {
let validate_context = RPCValidateContext {
registry: self.registry(),
question_context: question_context.as_ref().map(|x| x.as_ref()),
message_header: &encoded_msg.header,
};
operation.validate(&validate_context)?;

View file

@ -240,7 +240,7 @@ impl RPCProcessor {
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let inbound_get_value_result = network_result_try!(storage_manager
.inbound_get_value(opaque_record_key.clone(), subkey, want_descriptor)
.inbound_get_value(&opaque_record_key, subkey, want_descriptor)
.await
.map_err(RPCError::internal)?);

View file

@ -234,7 +234,7 @@ impl RPCProcessor {
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let inbound_inspect_value_result = network_result_try!(storage_manager
.inbound_inspect_value(opaque_record_key.clone(), subkeys, want_descriptor)
.inbound_inspect_value(&opaque_record_key, subkeys, want_descriptor)
.await
.map_err(RPCError::internal)?);

View file

@ -268,7 +268,7 @@ impl RPCProcessor {
let storage_manager = self.storage_manager();
let result = network_result_try!(storage_manager
.inbound_set_value(
opaque_record_key.clone(),
&opaque_record_key,
subkey,
Arc::new(value),
descriptor.map(Arc::new),

View file

@ -251,7 +251,7 @@ impl RPCProcessor {
let storage_manager = self.storage_manager();
let inbound_transact_value_result = network_result_try!(storage_manager
.inbound_transact_command(
opaque_record_key,
&opaque_record_key,
transaction_id,
command,
opt_seqs,
@ -262,17 +262,12 @@ impl RPCProcessor {
.map_err(RPCError::internal)?);
match inbound_transact_value_result {
InboundTransactCommandResult::Success {
expiration,
opt_seqs,
opt_subkey,
opt_value,
} => (
InboundTransactCommandResult::Success(res) => (
true,
Some(expiration),
opt_seqs,
opt_subkey,
opt_value.as_ref().map(|x| x.as_ref().clone()),
Some(res.expiration),
res.opt_seqs,
res.opt_subkey,
res.opt_value.as_ref().map(|x| x.as_ref().clone()),
),
InboundTransactCommandResult::InvalidTransaction => (
false,
@ -281,6 +276,11 @@ impl RPCProcessor {
Default::default(),
Default::default(),
),
InboundTransactCommandResult::InvalidArguments => {
return Ok(NetworkResult::invalid_message(
"not processing transact command request with invalid arguments",
))
}
}
};

View file

@ -98,7 +98,7 @@ impl StorageManager {
};
let opaque_record_key = record_key.opaque();
local_record_store
.debug_record_subkey_info(opaque_record_key, subkey)
.debug_record_subkey_info(&opaque_record_key, subkey)
.await
}
pub async fn debug_remote_record_subkey_info(
@ -112,7 +112,7 @@ impl StorageManager {
};
let opaque_record_key = record_key.opaque();
remote_record_store
.debug_record_subkey_info(opaque_record_key, subkey)
.debug_record_subkey_info(&opaque_record_key, subkey)
.await
}
pub async fn debug_local_record_info(&self, record_key: RecordKey) -> String {
@ -121,7 +121,7 @@ impl StorageManager {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();
let local_debug = local_record_store.debug_record_info(opaque_record_key);
let local_debug = local_record_store.debug_record_info(&opaque_record_key);
let opaque_record_key = record_key.opaque();
let opened_debug = if let Some(o) = inner.opened_records.get(&opaque_record_key) {
@ -139,6 +139,6 @@ impl StorageManager {
return "not initialized".to_owned();
};
let opaque_record_key = record_key.opaque();
remote_record_store.debug_record_info(opaque_record_key)
remote_record_store.debug_record_info(&opaque_record_key)
}
}

View file

@ -60,7 +60,7 @@ impl StorageManager {
// See if the requested subkey is our local record store
let last_get_result = self
.handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true)
.handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true)
.await?;
// Return the existing value if we have one unless we are forcing a refresh
@ -97,7 +97,7 @@ impl StorageManager {
.unwrap_or_default();
let res_rx = self
.outbound_get_value(
opaque_record_key.clone(),
&opaque_record_key,
subkey,
safety_selection,
last_get_result,
@ -113,7 +113,7 @@ impl StorageManager {
// Process the returned result
let out_encrypted = self
.process_outbound_get_value_result(opaque_record_key.clone(), subkey, last_seq, result)
.process_outbound_get_value_result(&opaque_record_key, subkey, last_seq, result)
.await?;
let out = if let Some(vd) = out_encrypted {
Some(self.maybe_decrypt_value_data(&record_key, &vd)?)
@ -141,20 +141,22 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_get_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<InboundGetValueResult>> {
let mut inner = self.inner.lock().await;
// See if the subkey we are getting has a last known remote value
let last_get_result = Self::handle_get_remote_value_inner(
&mut inner,
opaque_record_key,
subkey,
want_descriptor,
)
.await?;
// See if it's in the remote record store
let last_get_result = {
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
remote_record_store
.get_subkey(opaque_record_key, subkey, want_descriptor)
.await?
.unwrap_or_default()
};
Ok(NetworkResult::value(InboundGetValueResult::Success(
last_get_result,
@ -167,7 +169,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_get_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
last_get_result: GetResult,
@ -183,7 +185,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(opaque_record_key.clone())
self.get_value_nodes(opaque_record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -398,6 +400,7 @@ impl StorageManager {
// Call the fanout in a spawned task
let registry = self.registry();
let fanout_hash_coordinate = opaque_record_key.to_hash_coordinate();
spawn(
"outbound_get_value fanout",
Box::pin(
@ -405,7 +408,7 @@ impl StorageManager {
let routing_table = registry.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
opaque_record_key.to_hash_coordinate(),
fanout_hash_coordinate,
key_count,
fanout,
consensus_count,
@ -481,7 +484,7 @@ impl StorageManager {
}
};
let is_incomplete = result.fanout_result.kind.is_incomplete();
let value_data = match this.process_outbound_get_value_result(key.opaque(), subkey, last_seq, result).await {
let value_data = match this.process_outbound_get_value_result(&key.opaque(), subkey, last_seq, result).await {
Ok(Some(v)) => v,
Ok(None) => {
return is_incomplete;
@ -526,7 +529,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub(super) async fn process_outbound_get_value_result(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
last_seq: ValueSeqNum,
result: get_value::OutboundGetValueResult,
@ -554,8 +557,8 @@ impl StorageManager {
&mut inner,
opaque_record_key,
subkey,
None,
get_result_value.clone(),
InboundWatchUpdateMode::UpdateAll,
)
.await?;
}

View file

@ -95,12 +95,7 @@ impl StorageManager {
// See if the requested record is our local record store
let mut local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
subkeys.clone(),
true,
)
.handle_inspect_local_value_inner(&mut inner, &opaque_record_key, subkeys.clone(), true)
.await?;
// Get the offline subkeys for this record still only returning the ones we're inspecting
@ -145,7 +140,7 @@ impl StorageManager {
// Get the inspect record report from the network
let result = self
.outbound_inspect_value(
opaque_record_key.clone(),
&opaque_record_key,
subkeys,
safety_selection,
if matches!(scope, DHTReportScope::SyncGet | DHTReportScope::SyncSet) {
@ -198,7 +193,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_inspect_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
@ -229,7 +224,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(opaque_record_key.clone())
self.get_value_nodes(opaque_record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -510,7 +505,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_inspect_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<InboundInspectValueResult>> {
@ -522,14 +517,13 @@ impl StorageManager {
};
// See if the subkey we are getting has a last known remote value
let inspect_result = self
.handle_inspect_remote_value_inner(
&mut inner,
opaque_record_key,
subkeys,
want_descriptor,
)
.await?;
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
let inspect_result = remote_record_store
.inspect_record(opaque_record_key, &subkeys, want_descriptor)
.await?
.unwrap_or_default();
Ok(NetworkResult::value(InboundInspectValueResult::Success(
inspect_result,

View file

@ -0,0 +1,120 @@
use super::*;
impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
// See if the value is in the offline subkey writes first,
// since it may not have been committed yet to the local record store
if let Some(get_result) = self.get_offline_subkey_writes_subkey(
inner,
opaque_record_key,
subkey,
want_descriptor,
)? {
return Ok(get_result);
}
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = local_record_store
.get_subkey(opaque_record_key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
}
Ok(GetResult::default())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
opt_transaction_range: Option<&ValueSubkeyRangeSet>,
signed_value_data: Arc<SignedValueData>,
) -> VeilidAPIResult<()> {
// See if this new data supercedes any offline subkey writes
self.remove_old_offline_subkey_writes_inner(
inner,
opaque_record_key,
subkey,
signed_value_data.clone(),
);
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Write subkey to local store
local_record_store
.set_subkey(
opaque_record_key,
subkey,
opt_transaction_range,
signed_value_data,
InboundWatchUpdateMode::NoUpdate,
)
.await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: &OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = local_record_store
.inspect_record(opaque_record_key, &subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
Ok(InspectResult::default())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn get_value_nodes(
&self,
opaque_record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
let inner = self.inner.lock().await;
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_ref() else {
apibail_not_initialized!();
};
// Get routing table to see if we still know about these nodes
let routing_table = self.routing_table();
let opt_value_nodes = local_record_store.peek_record(opaque_record_key, |r| {
let d = r.detail();
d.nodes
.keys()
.cloned()
.filter_map(|nr| routing_table.lookup_node_ref(nr).ok().flatten())
.collect()
});
Ok(opt_value_nodes)
}
}

View file

@ -4,6 +4,7 @@ mod debug;
mod delete_record;
mod get_value;
mod inspect_value;
mod local_record_store_interface;
mod offline_subkey_writes;
mod open_record;
mod outbound_transaction_manager;
@ -12,7 +13,6 @@ mod record_encryption;
mod record_key;
mod record_lock_table;
mod record_store;
mod record_store_interface;
mod rehydrate;
mod schema;
mod set_value;
@ -44,8 +44,8 @@ pub(crate) use get_value::InboundGetValueResult;
pub(crate) use inspect_value::InboundInspectValueResult;
pub(crate) use set_value::InboundSetValueResult;
pub(crate) use transaction::OutboundTransactionHandle;
pub(crate) use transaction_begin::InboundTransactBeginResult;
pub(crate) use transaction_command::InboundTransactCommandResult;
pub(crate) use transaction_begin::{InboundTransactBeginResult, TransactBeginSuccess};
pub(crate) use transaction_command::{InboundTransactCommandResult, TransactCommandSuccess};
pub(crate) use watch_value::{InboundWatchParameters, InboundWatchValueResult};
pub use types::*;
@ -592,7 +592,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
fn check_fanout_set_offline(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
fanout_result: &FanoutResult,
) -> bool {

View file

@ -88,7 +88,7 @@ impl StorageManager {
pub(super) fn remove_old_offline_subkey_writes_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
) {
@ -144,7 +144,7 @@ impl StorageManager {
pub(super) fn finish_offline_subkey_writes_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkeys_written: ValueSubkeyRangeSet,
subkeys_still_offline: ValueSubkeyRangeSet,
) {

View file

@ -14,6 +14,7 @@ impl StorageManager {
};
let mut inner = self.inner.lock().await;
let opaque_record_key = record_key.opaque();
// See if we have a local record already or not
if let Some(res) = self
@ -32,7 +33,7 @@ impl StorageManager {
let set_consensus = self.config().network.dht.set_value_count as usize;
self.add_rehydration_request(
record_key.opaque(),
opaque_record_key,
ValueSubkeyRangeSet::full(),
set_consensus,
)
@ -53,7 +54,7 @@ impl StorageManager {
// Use the safety selection we opened the record with
let result = self
.outbound_inspect_value(
record_key.opaque(),
&opaque_record_key,
ValueSubkeyRangeSet::single(0),
safety_selection.clone(),
InspectResult::default(),
@ -64,7 +65,7 @@ impl StorageManager {
// If we got nothing back, the key wasn't found
if result.inspect_result.opt_descriptor().is_none() {
// No result
apibail_key_not_found!(record_key.opaque());
apibail_key_not_found!(opaque_record_key);
};
// Check again to see if we have a local record already or not

View file

@ -624,6 +624,16 @@ impl OutboundTransactionManager {
));
};
// Check if the subkey is in range
if subkey
> record_info
.schema()
.ok_or_else(|| VeilidAPIError::internal("missing descriptor"))?
.max_subkey()
{
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
let safety_selection = record_info.safety_selection().clone();
let node_transactions = record_info
.get_node_transactions()
@ -790,6 +800,16 @@ impl OutboundTransactionManager {
));
};
// Check if the subkey is in range
if subkey
> record_info
.schema()
.ok_or_else(|| VeilidAPIError::internal("missing descriptor"))?
.max_subkey()
{
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
let safety_selection = record_info.safety_selection().clone();
let node_transactions = record_info
.get_node_transactions()

View file

@ -123,6 +123,10 @@ impl OutboundTransactionRecord {
if self.local_snapshot.is_none() {
stage = OutboundTransactionStage::Failed;
}
// Descriptor was never found
if self.descriptor.is_none() {
stage = OutboundTransactionStage::Failed;
}
}
}

View file

@ -35,14 +35,14 @@ impl StorageManager {
let encryption_key = SharedSecret::new(record_key.kind(), encryption_key.clone());
vcrypto.crypt_in_place_no_auth(&mut data, &nonce, &encryption_key)?;
Ok(EncryptedValueData::new_with_seq(
Ok(EncryptedValueData::new(
value_data.seq(),
data,
value_data.writer(),
Some(nonce),
)?)
} else {
Ok(EncryptedValueData::new_with_seq(
Ok(EncryptedValueData::new(
value_data.seq(),
value_data.data().to_vec(),
value_data.writer(),

View file

@ -14,7 +14,7 @@ pub(crate) struct InboundTransaction {
/// Snapshot of record contents if record exists
pub opt_snapshot: Option<Arc<RecordSnapshot>>,
/// What has changed since snapshot
pub changed_subkeys: BTreeMap<ValueSubkey, Option<Arc<SignedValueData>>>,
pub changed_subkeys: BTreeMap<ValueSubkey, Arc<SignedValueData>>,
}
#[derive(Debug, Default, Clone)]
@ -41,70 +41,16 @@ where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn snapshot_record(
pub async fn begin_inbound_transaction(
&mut self,
record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<Option<Arc<RecordSnapshot>>> {
let Some((subkey_count, stored_subkeys)) = self.with_record(record_key, |record| {
(record.subkey_count(), record.stored_subkeys().clone())
}) else {
// Record not available
return Ok(None);
};
// Snapshot all subkeys
let mut all_value_data = vec![Option::<Arc<SignedValueData>>::None; subkey_count];
for subkey in stored_subkeys.iter() {
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey {
record_key: record_key.clone(),
subkey,
};
let svd = match self.subkey_cache.get(&stk) {
Some(record_data) => record_data.signed_value_data(),
None =>
// If not in cache, try to pull from table store if it is in our stored subkey set
{
match self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
{
Some(record_data) => {
let out = record_data.signed_value_data().clone();
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
out
}
None => {
apibail_internal!("failed to snapshot subkey that was stored");
}
}
}
};
let subkey = usize::try_from(subkey).map_err(VeilidAPIError::internal)?;
all_value_data[subkey] = Some(svd);
}
let out = Arc::new(RecordSnapshot::new(all_value_data));
Ok(Some(out))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn new_inbound_transaction(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
opt_descriptor: Option<SignedValueDescriptor>,
want_descriptor: bool,
signing_member_id: MemberId,
) -> VeilidAPIResult<InboundTransactBeginResult> {
// Get descriptor
let opt_existing_descriptor =
self.with_record(&opaque_record_key, |record| record.descriptor());
self.with_record(opaque_record_key, |record| record.descriptor());
let descriptor = match opt_existing_descriptor {
Some(x) => x,
None => {
@ -138,6 +84,11 @@ where
let is_member = member_check(&signing_member_id);
if let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) {
// If there is a record lock then this transaction can not be started right now
if active_transaction_list.record_lock.is_some() {
return Ok(InboundTransactBeginResult::TransactionUnavailable);
}
// Total up the number of transactions for this key
for t in &mut active_transaction_list.transactions {
existing_ids.insert(t.id);
@ -169,7 +120,7 @@ where
}
// Transaction can be added, so let's get a snapshot if the record exists already
let opt_snapshot = self.snapshot_record(&opaque_record_key).await?;
let opt_snapshot = self.snapshot_record(opaque_record_key).await?;
// Generate a record-unique transaction id > 0
let mut id = 0;
@ -179,7 +130,7 @@ where
// Make sure it doesn't match any other id or zero (unlikely, but lets be certain)
while existing_ids.contains(&id) {
let next_id = id.overflowing_add(1);
id = next_id.0 + next_id.1.then_some(1).unwrap_or_default();
id = next_id.0 + if next_id.1 { 1 } else { 0 };
}
// Make transaction expiration timestamp
@ -209,7 +160,7 @@ where
.push(inbound_transaction);
// Return the result
Ok(InboundTransactBeginResult::Success(TransactBeginResult {
Ok(InboundTransactBeginResult::Success(TransactBeginSuccess {
transaction_id: id,
expiration,
opt_descriptor: want_descriptor.then_some(descriptor),
@ -219,7 +170,7 @@ where
pub async fn end_inbound_transaction(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
) -> VeilidAPIResult<InboundTransactCommandResult> {
// See if this transaction is still valid
@ -253,19 +204,21 @@ where
// If there's no changes, we can just quit early with a zero expiration to indicate no commit or rollback is necessary
if inbound_transaction.changed_subkeys.is_empty() {
return Ok(InboundTransactCommandResult::Success {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
});
return Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
));
}
inbound_transaction.opt_snapshot.clone()
};
let end_snapshot = self.snapshot_record(&opaque_record_key).await?;
let end_snapshot = self.snapshot_record(opaque_record_key).await?;
// If our snapshot still validates, then the changes can be applied
// If the snapshot doesn't validate then the transaction is not valid
if begin_snapshot.as_ref().map(|s| s.seqs()) != end_snapshot.map(|s| s.seqs()) {
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
@ -276,6 +229,8 @@ where
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Everything is valid, we can end the transaction successfully
// Lock the record
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
@ -285,42 +240,250 @@ where
// Give the user another timeout bump to allow for commit
let expiration = Timestamp::now().later(self.limits.transaction_timeout);
Ok(InboundTransactCommandResult::Success {
expiration,
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
})
Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration,
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
))
}
pub async fn commit_inbound_transaction(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
) -> VeilidAPIResult<InboundTransactCommandResult> {
// See if this transaction is still valid
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let transaction = {
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
// If there is a record lock then it better be ours
// * If there is no record lock, then this commit is out of order and this transaction should be dropped
// * If the lock id is ours, then we can commit
// * If the lock id is not ours, then this commit is out of order and this transaction should be dropped
if active_transaction_list.record_lock != Some(transaction_id) {
// Drop inbound transaction
active_transaction_list.drop_transaction(transaction_id);
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Get the inbound transaction if it is still valid
let Some(inbound_transaction) = active_transaction_list
.transactions
.iter_mut()
.find(|x| x.id == transaction_id)
else {
apibail_internal!("inbound transaction missing even though it is locked");
};
// If there's no changes, the transaction should have been dropped by 'end' and not locked and we shouldnt get here
if inbound_transaction.changed_subkeys.is_empty() {
apibail_internal!("no changes in locked transaction");
}
inbound_transaction.clone()
};
// Apply all changes
let transaction_range =
ValueSubkeyRangeSet::from_iter(transaction.changed_subkeys.keys().copied());
for (subkey, changed_signed_value_data) in transaction.changed_subkeys.iter() {
if let Err(e) = self
.set_subkey(
opaque_record_key,
*subkey,
Some(&transaction_range),
changed_signed_value_data.clone(),
InboundWatchUpdateMode::UpdateAll,
)
.await
{
veilid_log!(self error "set_subkey failed in transaction: {}", e);
}
}
// Drop transaction and lock now that we're done
{
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
active_transaction_list.drop_transaction(transaction_id);
}
Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
))
}
pub async fn rollback_inbound_transaction(
pub fn rollback_inbound_transaction(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
) -> VeilidAPIResult<InboundTransactCommandResult> {
// See if this transaction is still valid
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// Rollback just needs to drop the transaction wherever it is
{
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
active_transaction_list.drop_transaction(transaction_id);
}
Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration: Default::default(),
opt_seqs: Default::default(),
opt_subkey: Default::default(),
opt_value: Default::default(),
},
))
}
pub async fn inbound_transaction_get(
pub fn inbound_transaction_get(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
subkey: ValueSubkey,
) -> VeilidAPIResult<InboundTransactCommandResult> {
// See if this transaction is still valid
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// If the transaction is still active and not ended/locked
let opt_value = {
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
if active_transaction_list.record_lock == Some(transaction_id) {
active_transaction_list.drop_transaction(transaction_id);
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Get the inbound transaction if it is still valid
let Some(inbound_transaction) = active_transaction_list
.transactions
.iter_mut()
.find(|x| x.id == transaction_id)
else {
// Nothing to drop
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
// Ensure subkey is within bounds
if subkey > inbound_transaction.descriptor.schema()?.max_subkey() {
return Ok(InboundTransactCommandResult::InvalidArguments);
}
// Get value to return
if let Some(snapshot) = &inbound_transaction.opt_snapshot {
snapshot.subkey_value_data(subkey)?
} else {
None
}
};
// Give the user another timeout bump to allow for more commands
let expiration = Timestamp::now().later(self.limits.transaction_timeout);
Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration,
opt_seqs: Default::default(),
opt_subkey: Some(subkey),
opt_value,
},
))
}
pub async fn inbound_transaction_set(
pub fn inbound_transaction_set(
&mut self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
subkey: ValueSubkey,
value: Arc<SignedValueData>,
) -> VeilidAPIResult<InboundTransactCommandResult> {
// See if this transaction is still valid
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
// If the transaction is still active and not ended/locked
let opt_value = {
let Some(active_transaction_list) = self.active_transactions.get_mut(&rtk) else {
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
if active_transaction_list.record_lock == Some(transaction_id) {
active_transaction_list.drop_transaction(transaction_id);
return Ok(InboundTransactCommandResult::InvalidTransaction);
}
// Get the inbound transaction if it is still valid
let Some(inbound_transaction) = active_transaction_list
.transactions
.iter_mut()
.find(|x| x.id == transaction_id)
else {
// Nothing to drop
return Ok(InboundTransactCommandResult::InvalidTransaction);
};
// Ensure subkey is within bounds
if subkey > inbound_transaction.descriptor.schema()?.max_subkey() {
return Ok(InboundTransactCommandResult::InvalidArguments);
}
// Get value to compare against
let opt_existing_value = if let Some(snapshot) = &inbound_transaction.opt_snapshot {
snapshot.subkey_value_data(subkey)?
} else {
None
};
// If the proposed sequence number is newer, then return no value
if value.value_data().seq()
> opt_existing_value
.as_ref()
.map(|x| x.value_data().seq())
.unwrap_or_default()
{
None
} else {
// Otherwise return the existing value
opt_existing_value
}
};
// Give the user another timeout bump to allow for more commands
let expiration = Timestamp::now().later(self.limits.transaction_timeout);
Ok(InboundTransactCommandResult::Success(
TransactCommandSuccess {
expiration,
opt_seqs: Default::default(),
opt_subkey: Some(subkey),
opt_value,
},
))
}
}

View file

@ -7,7 +7,8 @@ pub(crate) struct InboundWatch {
pub params: InboundWatchParameters,
/// A unique id per record assigned at watch creation time. Used to disambiguate a client's version of a watch
pub id: u64,
/// What has changed since the last update
/// What has changed in the watched range since the last update.
/// May include non-watched ranges if they were changed as part of an overlapping transaction
pub changed: ValueSubkeyRangeSet,
}
@ -35,8 +36,9 @@ where
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) async fn update_watched_value(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
opt_transaction_range: Option<&ValueSubkeyRangeSet>,
watch_update_mode: InboundWatchUpdateMode,
) {
let (do_update, opt_ignore_target) = match watch_update_mode {
@ -48,25 +50,31 @@ where
return;
}
let rtk = RecordTableKey { record_key };
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(wr) = self.watched_records.get_mut(&rtk) else {
return;
};
// Update all watchers
let mut changed = false;
let mut changed_watched = false;
for w in &mut wr.watches {
// If this watcher is watching the changed subkey then add to the watcher's changed list
// Don't bother marking changes for value sets coming from the same watching node/target because they
// are already going to be aware of the changes in that case
if Some(&w.params.target) != opt_ignore_target.as_ref()
&& w.params.subkeys.contains(subkey)
&& w.changed.insert(subkey)
{
changed = true;
if let Some(transaction_range) = opt_transaction_range {
w.changed = w.changed.union(transaction_range);
} else {
w.changed.insert(subkey);
}
changed_watched = true;
}
}
if changed {
if changed_watched {
self.changed_watched_values.insert(rtk);
}
}
@ -74,13 +82,13 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn create_new_watch(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
member_check: Box<dyn Fn(&MemberId) -> bool + Send>,
) -> VeilidAPIResult<InboundWatchValueResult> {
// Generate a record-unique watch id > 0
let rtk = RecordTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
};
// Calculate watch limits
@ -141,7 +149,7 @@ where
// Make sure it doesn't match any other id (unlikely, but lets be certain)
while existing_ids.contains(&id) {
let next_id = id.overflowing_add(1);
id = next_id.0 + next_id.1.then_some(1).unwrap_or_default();
id = next_id.0 + if next_id.1 { 1 } else { 0 };
}
// Ok this is an acceptable new watch, add it
@ -150,7 +158,7 @@ where
watch_list.watches.push(InboundWatch {
params,
id,
changed: ValueSubkeyRangeSet::new(),
changed: Default::default(),
});
Ok(InboundWatchValueResult::Created { id, expiration })
}
@ -158,7 +166,7 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn change_existing_watch(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
params: InboundWatchParameters,
watch_id: u64,
) -> VeilidAPIResult<InboundWatchValueResult> {
@ -169,7 +177,9 @@ where
apibail_internal!("zero expiration should have been resolved to max by now");
}
// Get the watch list for this record
let rtk = RecordTableKey { record_key };
let rtk = RecordTableKey {
record_key: opaque_record_key.clone(),
};
let Some(watch_list) = self.watched_records.get_mut(&rtk) else {
// No watches, nothing to change
return Ok(InboundWatchValueResult::Rejected);
@ -196,7 +206,7 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn watch_record(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: OpaqueRecordKey,
mut params: InboundWatchParameters,
opt_watch_id: Option<u64>,
) -> VeilidAPIResult<InboundWatchValueResult> {
@ -204,7 +214,11 @@ where
if params.count == 0 {
if let Some(watch_id) = opt_watch_id {
let cancelled = self
.cancel_watch(record_key.clone(), watch_id, params.watcher_member_id)
.cancel_watch(
opaque_record_key.clone(),
watch_id,
params.watcher_member_id,
)
.await?;
if cancelled {
return Ok(InboundWatchValueResult::Cancelled);
@ -226,7 +240,7 @@ where
// Don't add watches with too low of an expiration time
if let Some(watch_id) = opt_watch_id {
let cancelled = self
.cancel_watch(record_key, watch_id, params.watcher_member_id)
.cancel_watch(opaque_record_key, watch_id, params.watcher_member_id)
.await?;
if cancelled {
return Ok(InboundWatchValueResult::Cancelled);
@ -236,7 +250,7 @@ where
}
// Make a closure to check for member vs anonymous
let Some((schema, owner)) = self.with_record(&record_key, |record| {
let Some((schema, owner)) = self.with_record(&opaque_record_key, |record| {
let schema = record.schema();
let owner = record.owner();
(schema, owner)
@ -251,10 +265,10 @@ where
// Create or update depending on if a watch id is specified or not
if let Some(watch_id) = opt_watch_id {
self.change_existing_watch(record_key, params, watch_id)
self.change_existing_watch(&opaque_record_key, params, watch_id)
.await
} else {
self.create_new_watch(record_key, params, member_check)
self.create_new_watch(&opaque_record_key, params, member_check)
.await
}
}
@ -344,6 +358,7 @@ where
continue;
}
// Clear the change logs
w.changed.clear();
// Reduce the count of changes sent
@ -378,25 +393,30 @@ where
}
for evci in evcis {
// Get the first subkey data
let Some(first_subkey) = evci.subkeys.first() else {
veilid_log!(self error "first subkey should exist for value change notification");
continue;
};
let get_result = match self.get_subkey(evci.key.clone(), first_subkey, false).await {
Ok(Some(skr)) => skr,
Ok(None) => {
veilid_log!(self error "subkey should have data for value change notification");
// Get a single subkey data if we can send it
let value = if evci.subkeys.len() == 1 {
let Some(first_subkey) = evci.subkeys.first() else {
veilid_log!(self error "first subkey should exist for value change notification");
continue;
}
Err(e) => {
veilid_log!(self error "error getting subkey data for value change notification: {}", e);
};
let get_result = match self.get_subkey(&evci.key, first_subkey, false).await {
Ok(Some(skr)) => skr,
Ok(None) => {
veilid_log!(self error "subkey should have data for value change notification");
continue;
}
Err(e) => {
veilid_log!(self error "error getting subkey data for value change notification: {}", e);
continue;
}
};
let Some(value) = get_result.opt_value else {
veilid_log!(self error "first subkey should have had value for value change notification");
continue;
}
};
let Some(value) = get_result.opt_value else {
veilid_log!(self error "first subkey should have had value for value change notification");
continue;
};
Some(value)
} else {
None
};
changes.push(ValueChangedInfo {
@ -405,7 +425,7 @@ where
subkeys: evci.subkeys,
count: evci.count,
watch_id: evci.watch_id,
value: Some(value),
value,
});
}
}

View file

@ -141,19 +141,6 @@ pub struct InspectResult {
opt_descriptor: Option<Arc<SignedValueDescriptor>>,
}
/// The result of a single successful transaction begin
#[derive(Default, Debug, Clone)]
pub struct TransactBeginResult {
/// Transaction id
pub transaction_id: u64,
/// Expiration timestamp
pub expiration: Timestamp,
/// Descriptor
pub opt_descriptor: Option<Arc<SignedValueDescriptor>>,
/// Sequence numbers for record
pub seqs: Vec<ValueSeqNum>,
}
impl InspectResult {
pub fn new(
registry_accessor: &impl VeilidComponentRegistryAccessor,
@ -174,7 +161,7 @@ impl InspectResult {
veilid_log!(registry_accessor error "{}: more subkeys returned than requested: {} not a subset of {}", log_context, subkeys, requested_subkeys);
apibail_internal!("invalid subkeys returned");
}
Ok(InspectResult {
Ok(Self {
subkeys,
seqs,
opt_descriptor,
@ -492,10 +479,12 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn new_record(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: OpaqueRecordKey,
record: Record<D>,
) -> VeilidAPIResult<()> {
let rtk = RecordTableKey { record_key };
let rtk = RecordTableKey {
record_key: opaque_record_key,
};
if self.record_index.contains_key(&rtk) {
apibail_internal!("record already exists");
}
@ -538,9 +527,14 @@ where
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn delete_record(&mut self, record_key: OpaqueRecordKey) -> VeilidAPIResult<()> {
pub async fn delete_record(
&mut self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<()> {
// Get the record table key
let rtk = RecordTableKey { record_key };
let rtk = RecordTableKey {
record_key: opaque_record_key,
};
// Remove record from the index
let Some(record) = self.record_index.remove(&rtk) else {
@ -564,22 +558,26 @@ where
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn contains_record(&mut self, record_key: &OpaqueRecordKey) -> bool {
pub(super) fn contains_record(&mut self, opaque_record_key: &OpaqueRecordKey) -> bool {
let rtk = RecordTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
};
self.record_index.contains_key(&rtk)
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record<R, F>(&mut self, record_key: &OpaqueRecordKey, f: F) -> Option<R>
pub(super) fn with_record<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
f: F,
) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.get_mut(&rtk) {
// Callback
@ -598,14 +596,14 @@ where
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn peek_record<R, F>(&self, record_key: &OpaqueRecordKey, f: F) -> Option<R>
pub(super) fn peek_record<R, F>(&self, opaque_record_key: &OpaqueRecordKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.peek(&rtk) {
// Callback
@ -615,14 +613,18 @@ where
}
#[instrument(level = "trace", target = "stor", skip_all)]
pub(super) fn with_record_mut<R, F>(&mut self, record_key: &OpaqueRecordKey, f: F) -> Option<R>
pub(super) fn with_record_mut<R, F>(
&mut self,
opaque_record_key: &OpaqueRecordKey,
f: F,
) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
};
if let Some(record) = self.record_index.get_mut(&rtk) {
// Callback
@ -643,13 +645,13 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn get_subkey(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
// Get record from index
let Some((subkey_count, has_subkey, opt_descriptor)) =
self.with_record(&record_key, |record| {
self.with_record(opaque_record_key, |record| {
(
record.subkey_count(),
record.stored_subkeys().contains(subkey),
@ -680,7 +682,10 @@ where
}
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { record_key, subkey };
let stk = SubkeyTableKey {
record_key: opaque_record_key.clone(),
subkey,
};
if let Some(record_data) = self.subkey_cache.get(&stk) {
let out = record_data.signed_value_data().clone();
@ -713,13 +718,13 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn peek_subkey(
&self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<GetResult>> {
// record from index
let Some((subkey_count, has_subkey, opt_descriptor)) =
self.peek_record(&record_key, |record| {
self.peek_record(opaque_record_key, |record| {
(
record.subkey_count(),
record.stored_subkeys().contains(subkey),
@ -750,7 +755,10 @@ where
}
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { record_key, subkey };
let stk = SubkeyTableKey {
record_key: opaque_record_key.clone(),
subkey,
};
if let Some(record_data) = self.subkey_cache.peek(&stk) {
let out = record_data.signed_value_data().clone();
@ -780,8 +788,9 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn set_subkey(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
opt_transaction_range: Option<&ValueSubkeyRangeSet>,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
@ -796,11 +805,11 @@ where
// Get record subkey count and total size of all record subkey data exclusive of structures
let Some((subkey_count, prior_record_data_size)) = self
.with_record(&record_key, |record| {
.with_record(opaque_record_key, |record| {
(record.subkey_count(), record.record_data_size())
})
else {
apibail_invalid_argument!("no record at this key", "key", record_key);
apibail_invalid_argument!("no record at this key", "key", opaque_record_key);
};
// Check if the subkey is in range
@ -813,7 +822,7 @@ where
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
subkey,
};
let stk_bytes = stk.bytes();
@ -861,7 +870,7 @@ where
// Write to inspect cache
self.inspect_cache.replace_subkey_seq(
&stk.record_key,
opaque_record_key,
subkey,
subkey_record_data.signed_value_data().value_data().seq(),
);
@ -870,7 +879,7 @@ where
self.add_to_subkey_cache(stk, subkey_record_data);
// Update record
self.with_record_mut(&record_key, |record| {
self.with_record_mut(opaque_record_key, |record| {
record.store_subkey(subkey);
record.set_record_data_size(new_record_data_size);
})
@ -879,9 +888,14 @@ where
// Update storage space
self.total_storage_space.commit().unwrap();
// Send updates to
self.update_watched_value(record_key, subkey, watch_update_mode)
.await;
// Register change with inbound watches
self.update_watched_value(
opaque_record_key,
subkey,
opt_transaction_range,
watch_update_mode,
)
.await;
Ok(())
}
@ -889,26 +903,28 @@ where
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn inspect_record(
&mut self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkeys: &ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get record from index
let Some((schema_subkeys, opt_descriptor)) = self.with_record(&record_key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let schema_subkeys = record
.schema()
.truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT));
(
schema_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
}) else {
let Some((schema_subkeys, opt_descriptor)) =
self.with_record(opaque_record_key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let schema_subkeys = record
.schema()
.truncate_subkeys(subkeys, Some(DHTSchema::MAX_SUBKEY_COUNT));
(
schema_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
None
},
)
})
else {
// Record not available
return Ok(None);
};
@ -920,7 +936,7 @@ where
}
// See if we have this inspection cached
if let Some(icv) = self.inspect_cache.get(&record_key, &schema_subkeys) {
if let Some(icv) = self.inspect_cache.get(opaque_record_key, &schema_subkeys) {
return Ok(Some(InspectResult::new(
self,
subkeys.clone(),
@ -936,7 +952,7 @@ where
let mut seqs = Vec::with_capacity(schema_subkeys.len() as usize);
for subkey in schema_subkeys.iter() {
let stk = SubkeyTableKey {
record_key: record_key.clone(),
record_key: opaque_record_key.clone(),
subkey,
};
let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {
@ -956,7 +972,7 @@ where
// Save seqs cache
self.inspect_cache.put(
record_key,
opaque_record_key.clone(),
schema_subkeys.clone(),
InspectCacheL2Value { seqs: seqs.clone() },
);
@ -1023,11 +1039,13 @@ where
out
}
pub fn debug_record_info(&self, record_key: OpaqueRecordKey) -> String {
pub fn debug_record_info(&self, opaque_record_key: &OpaqueRecordKey) -> String {
let record_info = self
.peek_record(&record_key, |r| format!("{:#?}", r))
.peek_record(opaque_record_key, |r| format!("{:#?}", r))
.unwrap_or("Not found".to_owned());
let watched_record = match self.watched_records.get(&RecordTableKey { record_key }) {
let watched_record = match self.watched_records.get(&RecordTableKey {
record_key: opaque_record_key.clone(),
}) {
Some(w) => {
format!("Remote Watches: {:#?}", w)
}
@ -1038,10 +1056,10 @@ where
pub async fn debug_record_subkey_info(
&self,
record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
) -> String {
match self.peek_subkey(record_key, subkey, true).await {
match self.peek_subkey(opaque_record_key, subkey, true).await {
Ok(Some(v)) => {
format!("{:#?}", v)
}

View file

@ -9,10 +9,6 @@ impl RecordSnapshot {
pub fn new(all_value_data: Vec<Option<Arc<SignedValueData>>>) -> Self {
Self { all_value_data }
}
pub fn all_value_data(&self) -> &[Option<Arc<SignedValueData>>] {
&self.all_value_data
}
pub fn subkey_value_data(
&self,
subkey: ValueSubkey,
@ -47,3 +43,62 @@ impl RecordSnapshot {
.collect()
}
}
impl<D> RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub async fn snapshot_record(
&mut self,
record_key: &OpaqueRecordKey,
) -> VeilidAPIResult<Option<Arc<RecordSnapshot>>> {
let Some((subkey_count, stored_subkeys)) = self.with_record(record_key, |record| {
(record.subkey_count(), record.stored_subkeys().clone())
}) else {
// Record not available
return Ok(None);
};
// Snapshot all subkeys
let mut all_value_data = vec![Option::<Arc<SignedValueData>>::None; subkey_count];
for subkey in stored_subkeys.iter() {
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey {
record_key: record_key.clone(),
subkey,
};
let svd = match self.subkey_cache.get(&stk) {
Some(record_data) => record_data.signed_value_data(),
None =>
// If not in cache, try to pull from table store if it is in our stored subkey set
{
match self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
{
Some(record_data) => {
let out = record_data.signed_value_data().clone();
// Add to cache, do nothing with lru out
self.add_to_subkey_cache(stk, record_data);
out
}
None => {
apibail_internal!("failed to snapshot subkey that was stored");
}
}
}
};
let subkey = usize::try_from(subkey).map_err(VeilidAPIError::internal)?;
all_value_data[subkey] = Some(svd);
}
let out = Arc::new(RecordSnapshot::new(all_value_data));
Ok(Some(out))
}
}

View file

@ -1,305 +0,0 @@
use super::*;
impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
// See if the value is in the offline subkey writes first,
// since it may not have been committed yet to the local record store
if let Some(get_result) = self.get_offline_subkey_writes_subkey(
inner,
&opaque_record_key,
subkey,
want_descriptor,
)? {
return Ok(get_result);
}
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = local_record_store
.get_subkey(opaque_record_key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
}
Ok(GetResult {
opt_value: None,
opt_descriptor: None,
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if this new data supercedes any offline subkey writes
self.remove_old_offline_subkey_writes_inner(
inner,
opaque_record_key.clone(),
subkey,
signed_value_data.clone(),
);
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Write subkey to local store
local_record_store
.set_subkey(
opaque_record_key,
subkey,
signed_value_data,
watch_update_mode,
)
.await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_local_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = local_record_store
.inspect_record(opaque_record_key, &subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
InspectResult::new(
self,
subkeys,
"handle_inspect_local_value_inner",
ValueSubkeyRangeSet::new(),
vec![],
None,
)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_remote_value_inner(
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<GetResult> {
// See if it's in the remote record store
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(get_result) = remote_record_store
.get_subkey(opaque_record_key, subkey, want_descriptor)
.await?
{
return Ok(get_result);
}
Ok(GetResult {
opt_value: None,
opt_descriptor: None,
})
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_set_remote_value_inner(
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkey: ValueSubkey,
signed_value_data: Arc<SignedValueData>,
signed_value_descriptor: Arc<SignedValueDescriptor>,
watch_update_mode: InboundWatchUpdateMode,
) -> VeilidAPIResult<()> {
// See if it's in the remote record store
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
// See if we have a remote record already or not
if remote_record_store
.with_record(&opaque_record_key, |_| {})
.is_none()
{
// record didn't exist, make it
let cur_ts = Timestamp::now();
let remote_record_detail = RemoteRecordDetail {};
let record = Record::<RemoteRecordDetail>::new(
cur_ts,
signed_value_descriptor,
remote_record_detail,
)?;
remote_record_store
.new_record(opaque_record_key.clone(), record)
.await?
};
// Write subkey to remote store
remote_record_store
.set_subkey(
opaque_record_key,
subkey,
signed_value_data,
watch_update_mode,
)
.await?;
Ok(())
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_inspect_remote_value_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<InspectResult> {
// See if it's in the local record store
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
if let Some(inspect_result) = remote_record_store
.inspect_record(opaque_record_key, &subkeys, want_descriptor)
.await?
{
return Ok(inspect_result);
}
InspectResult::new(
self,
subkeys,
"handle_inspect_remote_value_inner",
ValueSubkeyRangeSet::new(),
vec![],
None,
)
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn get_value_nodes(
&self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
let inner = self.inner.lock().await;
// Get local record store
let Some(local_record_store) = inner.local_record_store.as_ref() else {
apibail_not_initialized!();
};
// Get routing table to see if we still know about these nodes
let routing_table = self.routing_table();
let opt_value_nodes = local_record_store.peek_record(&opaque_record_key, |r| {
let d = r.detail();
d.nodes
.keys()
.cloned()
.filter_map(|nr| routing_table.lookup_node_ref(nr).ok().flatten())
.collect()
});
Ok(opt_value_nodes)
}
// #[instrument(level = "trace", target = "stor", skip_all, err)]
// async fn move_remote_record_to_local_inner(
// &self,
// inner: &mut StorageManagerInner,
// record_key: RecordKey,
// safety_selection: SafetySelection,
// ) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>> {
// // Get local record store
// let Some(local_record_store) = inner.local_record_store.as_mut() else {
// apibail_not_initialized!();
// };
// // Get remote record store
// let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
// apibail_not_initialized!();
// };
// let rcb = |r: &Record<RemoteRecordDetail>| {
// // Return record details
// r.clone()
// };
// let opaque_record_key = record_key.opaque();
// let Some(remote_record) = remote_record_store.with_record(&opaque_record_key, rcb) else {
// // No local or remote record found, return None
// return Ok(None);
// };
// // Make local record
// let cur_ts = Timestamp::now();
// let local_record = Record::new(
// cur_ts,
// remote_record.descriptor().clone(),
// LocalRecordDetail::new(safety_selection),
// )?;
// local_record_store
// .new_record(opaque_record_key.clone(), local_record)
// .await?;
// // Move copy subkey data from remote to local store
// for subkey in remote_record.stored_subkeys().iter() {
// let Some(get_result) = remote_record_store
// .get_subkey(opaque_record_key.clone(), subkey, false)
// .await?
// else {
// // Subkey was missing
// veilid_log!(self warn "Subkey was missing: {} #{}", record_key, subkey);
// continue;
// };
// let Some(subkey_data) = get_result.opt_value else {
// // Subkey was missing
// veilid_log!(self warn "Subkey data was missing: {} #{}", record_key, subkey);
// continue;
// };
// local_record_store
// .set_subkey(
// opaque_record_key.clone(),
// subkey,
// subkey_data,
// InboundWatchUpdateMode::NoUpdate,
// )
// .await?;
// }
// // Move watches
// local_record_store.move_watches(
// opaque_record_key.clone(),
// remote_record_store.move_watches(opaque_record_key.clone(), None),
// );
// // Delete remote record from store
// remote_record_store
// .delete_record(opaque_record_key.clone())
// .await?;
// // Return record information as transferred to local record
// Ok(Some((remote_record.owner(), remote_record.schema())))
// }
}

View file

@ -90,12 +90,7 @@ impl StorageManager {
// See if the requested record is our local record store
let local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
subkeys.clone(),
true,
)
.handle_inspect_local_value_inner(&mut inner, &opaque_record_key, subkeys.clone(), true)
.await?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network
@ -112,7 +107,7 @@ impl StorageManager {
// Get the inspect record report from the network
let result = self
.outbound_inspect_value(
opaque_record_key.clone(),
&opaque_record_key,
local_inspect_result.subkeys().clone(),
safety_selection.clone(),
InspectResult::default(),
@ -150,13 +145,13 @@ impl StorageManager {
async fn rehydrate_single_subkey_inner(
&self,
inner: &mut StorageManagerInner,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) -> bool {
// Get value to rehydrate with
let get_result = match self
.handle_get_local_value_inner(inner, opaque_record_key.clone(), subkey, false)
.handle_get_local_value_inner(inner, opaque_record_key, subkey, false)
.await
{
Ok(v) => v,
@ -178,7 +173,7 @@ impl StorageManager {
veilid_log!(self debug "Rehydrating: record={} subkey={}", opaque_record_key, subkey);
self.add_offline_subkey_write_inner(
inner,
opaque_record_key,
opaque_record_key.clone(),
subkey,
safety_selection,
data,
@ -207,7 +202,7 @@ impl StorageManager {
if self
.rehydrate_single_subkey_inner(
&mut inner,
opaque_record_key.clone(),
&opaque_record_key,
subkey,
safety_selection.clone(),
)
@ -262,7 +257,7 @@ impl StorageManager {
if self
.rehydrate_single_subkey_inner(
&mut inner,
opaque_record_key.clone(),
&opaque_record_key,
subkey,
safety_selection.clone(),
)

View file

@ -82,7 +82,7 @@ impl StorageManager {
// Make signed value data (encrypted) and value data (unencrypted) and get descriptor for this value
let last_get_result = self
.handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true)
.handle_get_local_value_inner(&mut inner, &opaque_record_key, subkey, true)
.await?;
let (signed_value_data, value_data, descriptor) =
@ -114,7 +114,7 @@ impl StorageManager {
// Use the safety selection we opened the record with
let res_rx = match self
.outbound_set_value(
opaque_record_key.clone(),
&opaque_record_key,
subkey,
safety_selection.clone(),
signed_value_data.clone(),
@ -343,7 +343,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_set_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
value: Arc<SignedValueData>,
@ -360,7 +360,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(opaque_record_key.clone())
self.get_value_nodes(opaque_record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -567,6 +567,7 @@ impl StorageManager {
// Call the fanout in a spawned task
let registry = self.registry();
let fanout_hash_coordinate = opaque_record_key.to_hash_coordinate();
spawn(
"outbound_set_value fanout",
Box::pin(
@ -574,7 +575,7 @@ impl StorageManager {
let routing_table = registry.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
opaque_record_key.to_hash_coordinate(),
fanout_hash_coordinate,
key_count,
fanout,
consensus_count,
@ -704,15 +705,16 @@ impl StorageManager {
) -> Result<Option<ValueData>, VeilidAPIError> {
// Regain the lock after network access
let mut inner = self.inner.lock().await;
let opaque_record_key = record_key.opaque();
// Report on fanout result offline
let was_offline =
self.check_fanout_set_offline(record_key.opaque(), subkey, &result.fanout_result);
self.check_fanout_set_offline(&opaque_record_key, subkey, &result.fanout_result);
if was_offline {
// Failed to write, try again later
self.add_offline_subkey_write_inner(
&mut inner,
record_key.opaque(),
opaque_record_key.clone(),
subkey,
safety_selection,
result.signed_value_data.clone(),
@ -722,7 +724,7 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner(
&mut inner,
record_key.opaque(),
opaque_record_key.clone(),
core::iter::once((ValueSubkeyRangeSet::single(subkey), result.fanout_result)),
true,
self.config().network.dht.consensus_width as usize,
@ -731,10 +733,10 @@ impl StorageManager {
// Record the set value locally since it was successfully set online
self.handle_set_local_value_inner(
&mut inner,
record_key.opaque(),
&opaque_record_key,
subkey,
None,
result.signed_value_data.clone(),
InboundWatchUpdateMode::UpdateAll,
)
.await?;
@ -756,7 +758,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_set_value(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
value: Arc<SignedValueData>,
descriptor: Option<Arc<SignedValueDescriptor>>,
@ -765,13 +767,15 @@ impl StorageManager {
let mut inner = self.inner.lock().await;
// See if the subkey we are modifying has a last known remote value
let last_get_result = Self::handle_get_remote_value_inner(
&mut inner,
opaque_record_key.clone(),
subkey,
true,
)
.await?;
let last_get_result = {
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
remote_record_store
.get_subkey(opaque_record_key, subkey, true)
.await?
.unwrap_or_default()
};
// Make sure this value would actually be newer
if let Some(last_value) = &last_get_result.opt_value {
@ -837,24 +841,42 @@ impl StorageManager {
}
// Do the set and return no new value
let res = Self::handle_set_remote_value_inner(
&mut inner,
opaque_record_key.clone(),
subkey,
value,
actual_descriptor,
InboundWatchUpdateMode::ExcludeTarget(target),
)
.await;
// See if it's in the remote record store
let Some(remote_record_store) = inner.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
// See if we have a remote record already or not
if remote_record_store
.with_record(opaque_record_key, |_| {})
.is_none()
{
// record didn't exist, make it
let cur_ts = Timestamp::now();
let remote_record_detail = RemoteRecordDetail {};
let record =
Record::<RemoteRecordDetail>::new(cur_ts, actual_descriptor, remote_record_detail)?;
remote_record_store
.new_record(opaque_record_key.clone(), record)
.await?
};
// Write subkey to remote store
let res = remote_record_store
.set_subkey(
opaque_record_key,
subkey,
None,
value,
InboundWatchUpdateMode::ExcludeTarget(target),
)
.await;
match res {
Ok(()) => {}
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
}
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
}
Ok(()) => Ok(NetworkResult::value(InboundSetValueResult::Success)),
Err(VeilidAPIError::Internal { message }) => Err(VeilidAPIError::Internal { message }),
Err(e) => Ok(NetworkResult::invalid_message(e)),
}
Ok(NetworkResult::value(InboundSetValueResult::Success))
}
}

View file

@ -31,7 +31,7 @@ impl StorageManager {
async fn write_single_offline_subkey(
&self,
stop_token: StopToken,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) -> EyreResult<OfflineSubkeyWriteResult> {
@ -47,7 +47,7 @@ impl StorageManager {
let get_result = {
let mut inner = self.inner.lock().await;
self.handle_get_local_value_inner(&mut inner, opaque_record_key.clone(), subkey, true)
self.handle_get_local_value_inner(&mut inner, opaque_record_key, subkey, true)
.await
};
let Ok(get_result) = get_result else {
@ -67,7 +67,7 @@ impl StorageManager {
veilid_log!(self debug "Offline subkey write: {}:{} len={}", opaque_record_key, subkey, value.value_data().data().len());
let osvres = self
.outbound_set_value(
opaque_record_key.clone(),
opaque_record_key,
subkey,
safety_selection,
value.clone(),
@ -92,10 +92,10 @@ impl StorageManager {
self.handle_set_local_value_inner(
&mut inner,
opaque_record_key.clone(),
opaque_record_key,
subkey,
None,
result.signed_value_data.clone(),
InboundWatchUpdateMode::UpdateAll,
)
.await?;
}
@ -136,7 +136,7 @@ impl StorageManager {
let result = match self
.write_single_offline_subkey(
stop_token.clone(),
work_item.opaque_record_key.clone(),
&work_item.opaque_record_key,
subkey,
work_item.safety_selection.clone(),
)
@ -156,7 +156,7 @@ impl StorageManager {
// Process non-partial setvalue result
let was_offline = self.check_fanout_set_offline(
work_item.opaque_record_key.clone(),
&work_item.opaque_record_key,
subkey,
&result.fanout_result,
);
@ -187,7 +187,7 @@ impl StorageManager {
let subkeys_still_offline = result.work_item.subkeys.difference(&result.written_subkeys);
self.finish_offline_subkey_writes_inner(
&mut inner,
result.work_item.opaque_record_key.clone(),
&result.work_item.opaque_record_key,
result.written_subkeys,
subkeys_still_offline,
);

View file

@ -17,11 +17,6 @@ impl StorageManager {
{
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store
.take_value_changes(&mut value_changes)
.await;
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store
.take_value_changes(&mut value_changes)

View file

@ -5,7 +5,7 @@ use super::*;
// encrypted_value_data
pub fn test_encrypted_value_data() {
let orig = EncryptedValueData::new_with_seq(
let orig = EncryptedValueData::new(
42.into(),
b"Brent Spiner".to_vec(),
fix_fake_public_key(),

View file

@ -391,15 +391,18 @@ impl StorageManager {
for record_info in transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key().opaque();
let transaction_range = ValueSubkeyRangeSet::from_iter(
record_info.commit_subkey_states().iter().map(|x| *x.0),
);
for (subkey, subkey_state) in record_info.commit_subkey_states().iter() {
// Record the set value locally since it was successfully set online
if let Some(signed_value_data) = subkey_state.opt_value.clone() {
self.handle_set_local_value_inner(
inner,
opaque_record_key.clone(),
&opaque_record_key,
*subkey,
Some(&transaction_range),
signed_value_data,
InboundWatchUpdateMode::UpdateAll,
)
.await?;
}

View file

@ -53,13 +53,26 @@ pub(super) struct OutboundTransactBeginResult {
#[derive(Clone, Debug)]
pub(crate) enum InboundTransactBeginResult {
/// Value transacted successfully
Success(TransactBeginResult),
Success(TransactBeginSuccess),
/// Transaction unavailable due to limits
TransactionUnavailable,
/// Descriptor required but not provided,
NeedDescriptor,
}
/// The result of a single successful transaction begin
#[derive(Default, Debug, Clone)]
pub(crate) struct TransactBeginSuccess {
/// Transaction id
pub transaction_id: u64,
/// Expiration timestamp
pub expiration: Timestamp,
/// Descriptor
pub opt_descriptor: Option<Arc<SignedValueDescriptor>>,
/// Sequence numbers for record
pub seqs: Vec<ValueSeqNum>,
}
impl StorageManager {
////////////////////////////////////////////////////////////////////////
@ -89,7 +102,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(opaque_record_key.clone())
self.get_value_nodes(&opaque_record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -107,7 +120,7 @@ impl StorageManager {
let local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
&opaque_record_key,
ValueSubkeyRangeSet::full(),
true,
)
@ -394,8 +407,8 @@ impl StorageManager {
};
return remote_record_store
.new_inbound_transaction(
opaque_record_key,
.begin_inbound_transaction(
&opaque_record_key,
opt_descriptor,
want_descriptor,
signing_member_id,

View file

@ -31,6 +31,7 @@ pub(super) struct OutboundTransactCommandPerNodeResult {
/// True if the transaction is still valid
pub transaction_valid: bool,
/// Return from the command (sequence numbers)
#[expect(dead_code)]
pub opt_seqs: Option<Vec<ValueSeqNum>>,
/// Return from the command (subkey number)
pub opt_subkey: Option<ValueSubkey>,
@ -51,18 +52,24 @@ pub(super) struct OutboundTransactCommandResult {
#[derive(Clone, Debug)]
pub(crate) enum InboundTransactCommandResult {
/// Value transacted successfully
Success {
/// Expiration timestamp
expiration: Timestamp,
/// Sequence numbers
opt_seqs: Option<Vec<ValueSeqNum>>,
/// Subkey
opt_subkey: Option<ValueSubkey>,
/// Value
opt_value: Option<Arc<SignedValueData>>,
},
Success(TransactCommandSuccess),
/// Transaction not valid
InvalidTransaction,
/// Invalid arguments
InvalidArguments,
}
/// The result of a single successful transaction command
#[derive(Default, Debug, Clone)]
pub(crate) struct TransactCommandSuccess {
/// Expiration timestamp
pub expiration: Timestamp,
/// Sequence numbers
pub opt_seqs: Option<Vec<ValueSeqNum>>,
/// Subkey
pub opt_subkey: Option<ValueSubkey>,
/// Value
pub opt_value: Option<Arc<SignedValueData>>,
}
impl StorageManager {
@ -92,7 +99,7 @@ impl StorageManager {
let local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
&opaque_record_key,
ValueSubkeyRangeSet::full(),
true,
)
@ -197,7 +204,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_transact_command(
&self,
opaque_record_key: OpaqueRecordKey,
opaque_record_key: &OpaqueRecordKey,
transaction_id: u64,
command: TransactCommand,
_opt_seqs: Option<Vec<ValueSeqNum>>,
@ -227,18 +234,17 @@ impl StorageManager {
.commit_inbound_transaction(opaque_record_key, transaction_id)
.await?
}
TransactCommand::Rollback => {
remote_record_store
.rollback_inbound_transaction(opaque_record_key, transaction_id)
.await?
}
TransactCommand::Rollback => remote_record_store
.rollback_inbound_transaction(opaque_record_key, transaction_id)?,
TransactCommand::Get => {
let Some(subkey) = opt_subkey else {
return Ok(NetworkResult::invalid_message("missing subkey"));
};
remote_record_store
.inbound_transaction_get(opaque_record_key, transaction_id, subkey)
.await?
remote_record_store.inbound_transaction_get(
opaque_record_key,
transaction_id,
subkey,
)?
}
TransactCommand::Set => {
let Some(subkey) = opt_subkey else {
@ -247,9 +253,12 @@ impl StorageManager {
let Some(value) = opt_value else {
return Ok(NetworkResult::invalid_message("missing value"));
};
remote_record_store
.inbound_transaction_set(opaque_record_key, transaction_id, subkey, value)
.await?
remote_record_store.inbound_transaction_set(
opaque_record_key,
transaction_id,
subkey,
value,
)?
}
};

View file

@ -11,11 +11,7 @@ pub struct EncryptedValueData {
impl EncryptedValueData {
pub const MAX_LEN: usize = 32768;
pub fn new(data: Vec<u8>, writer: PublicKey, nonce: Option<Nonce>) -> VeilidAPIResult<Self> {
Self::new_with_seq(ValueSeqNum::ZERO, data, writer, nonce)
}
pub fn new_with_seq(
pub fn new(
seq: ValueSeqNum,
data: Vec<u8>,
writer: PublicKey,
@ -234,7 +230,7 @@ impl<'de> serde::Deserialize<'de> for EncryptedValueData {
Ok(EncryptedValueData { blob })
}
Helper::Legacy(legacy) => {
EncryptedValueData::new_with_seq(legacy.seq, legacy.data, legacy.writer, None)
EncryptedValueData::new(legacy.seq, legacy.data, legacy.writer, None)
.map_err(serde::de::Error::custom)
}
}
@ -249,12 +245,6 @@ mod tests {
#[test]
fn value_data_ok() {
assert!(EncryptedValueData::new(
vec![0; EncryptedValueData::MAX_LEN],
fix_fake_public_key(),
None,
)
.is_ok());
assert!(EncryptedValueData::new_with_seq(
ValueSeqNum::ZERO,
vec![0; EncryptedValueData::MAX_LEN],
fix_fake_public_key(),
@ -266,12 +256,6 @@ mod tests {
#[test]
fn value_data_too_long() {
assert!(EncryptedValueData::new(
vec![0; EncryptedValueData::MAX_LEN + 1],
fix_fake_public_key(),
None,
)
.is_err());
assert!(EncryptedValueData::new_with_seq(
ValueSeqNum::ZERO,
vec![0; EncryptedValueData::MAX_LEN + 1],
fix_fake_public_key(),

View file

@ -438,7 +438,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(opaque_record_key)
self.get_value_nodes(&opaque_record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -1437,12 +1437,7 @@ impl StorageManager {
let mut report_value_change = false;
if let Some(value) = &value {
let last_get_result = self
.handle_get_local_value_inner(
inner,
opaque_record_key.clone(),
first_subkey,
true,
)
.handle_get_local_value_inner(inner, &opaque_record_key, first_subkey, true)
.await?;
let descriptor = last_get_result.opt_descriptor.unwrap();
@ -1488,10 +1483,10 @@ impl StorageManager {
if report_value_change {
self.handle_set_local_value_inner(
inner,
opaque_record_key.clone(),
&opaque_record_key,
first_subkey,
None,
value.clone(),
InboundWatchUpdateMode::NoUpdate,
)
.await?;
}
@ -1546,9 +1541,16 @@ impl StorageManager {
// Announce ValueChanged VeilidUpdate
// Cancellations (count=0) are sent by process_outbound_watch_dead(), not here
if report_value_change {
let value = self.maybe_decrypt_value_data(&record_key, value.unwrap().value_data())?;
let value = self.maybe_decrypt_value_data(
&record_key,
value
.ok_or_else(|| {
VeilidAPIError::internal("value must be present to report value change")
})?
.value_data(),
)?;
// We have a value with a newer sequence number to report
// We have a single value with a newer sequence number to report
self.update_callback_value_change(
record_key,
reportable_subkeys,

View file

@ -112,12 +112,6 @@ impl ValueSubkeyRangeSet {
}
}
impl FromIterator<ValueSubkey> for ValueSubkeyRangeSet {
fn from_iter<T: IntoIterator<Item = ValueSubkey>>(iter: T) -> Self {
Self::new_with_data(RangeSetBlaze::from_iter(iter))
}
}
// impl TryFrom<Box<[Box<[ValueSubkey]>]>> for ValueSubkeyRangeSet {
// type Error = VeilidAPIError;
@ -182,6 +176,13 @@ impl FromStr for ValueSubkeyRangeSet {
}
}
impl FromIterator<ValueSubkey> for ValueSubkeyRangeSet {
fn from_iter<T: IntoIterator<Item = ValueSubkey>>(iter: T) -> Self {
let data = RangeSetBlaze::<ValueSubkey>::from_iter(iter);
Self::new_with_data(data)
}
}
impl Deref for ValueSubkeyRangeSet {
type Target = RangeSetBlaze<ValueSubkey>;