checkpoint

This commit is contained in:
Christien Rioux 2025-10-03 14:56:35 -04:00
parent 610c61a082
commit b16d0a5287
11 changed files with 320 additions and 201 deletions

View file

@ -205,7 +205,7 @@ impl StorageManager {
// Make the return channel // Make the return channel
let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundGetValueResult>>(); let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundGetValueResult>>();
// Make do-get-value answer context // Make operation context
let context = Arc::new(Mutex::new(OutboundGetValueContext { let context = Arc::new(Mutex::new(OutboundGetValueContext {
value: last_get_result.opt_value, value: last_get_result.opt_value,
descriptor: last_get_result.opt_descriptor.clone(), descriptor: last_get_result.opt_descriptor.clone(),

View file

@ -251,7 +251,7 @@ impl StorageManager {
.collect() .collect()
}; };
// Make do-inspect-value answer context // Make operation context
let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() { let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network. // Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
Some(DescriptorInfo::new(descriptor, &subkeys)?) Some(DescriptorInfo::new(descriptor, &subkeys)?)

View file

@ -39,7 +39,7 @@ pub(crate) use get_value::InboundGetValueResult;
pub(crate) use inspect_value::InboundInspectValueResult; pub(crate) use inspect_value::InboundInspectValueResult;
pub(crate) use record_store::{InboundWatchParameters, InboundWatchValueResult}; pub(crate) use record_store::{InboundWatchParameters, InboundWatchValueResult};
pub(crate) use set_value::InboundSetValueResult; pub(crate) use set_value::InboundSetValueResult;
pub(crate) use transact_value::InboundTransactValueResult; pub(crate) use transact_value::{InboundTransactValueResult, OutboundTransactionHandle};
pub use types::*; pub use types::*;
impl_veilid_log_facility!("stor"); impl_veilid_log_facility!("stor");
@ -76,8 +76,8 @@ const REHYDRATE_RECORDS_INTERVAL_SECS: u32 = 1;
const REHYDRATE_BATCH_SIZE: usize = 16; const REHYDRATE_BATCH_SIZE: usize = 16;
/// Maximum 'offline lag' before we decide to poll for changed watches /// Maximum 'offline lag' before we decide to poll for changed watches
const CHANGE_INSPECT_LAG_SECS: u32 = 2; const CHANGE_INSPECT_LAG_SECS: u32 = 2;
/// Length of set value descriptor cache (512 records and 5 nodes per record, roughly 184320 bytes) /// Length of descriptor cache (512 records and 5 nodes per record, roughly 184320 bytes)
const SET_VALUE_DESCRIPTOR_CACHE_SIZE: usize = 2560; const DESCRIPTOR_CACHE_SIZE: usize = 2560;
/// Table store table for storage manager metadata /// Table store table for storage manager metadata
const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata";
/// Storage manager metadata key name for offline subkey write persistence /// Storage manager metadata key name for offline subkey write persistence
@ -104,7 +104,7 @@ struct ValueChangedInfo {
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
/// A single 'value changed' message to send /// A single 'value changed' message to send
struct SetValueDescriptorCacheKey { struct DescriptorCacheKey {
opaque_record_key: OpaqueRecordKey, opaque_record_key: OpaqueRecordKey,
node_id: NodeId, node_id: NodeId,
} }
@ -188,8 +188,8 @@ pub(crate) struct StorageManager {
// background operations the storage manager wants to perform // background operations the storage manager wants to perform
background_operation_processor: DeferredStreamProcessor, background_operation_processor: DeferredStreamProcessor,
/// Cache of which nodes have seen descriptors for which records to optimize outbound_set_value /// Cache of which nodes have seen descriptors for which records to optimize outbound set_value and transact_value operations
set_value_descriptor_cache: Arc<Mutex<LruCache<SetValueDescriptorCacheKey, ()>>>, descriptor_cache: Arc<Mutex<LruCache<DescriptorCacheKey, ()>>>,
// Online check // Online check
is_online: AtomicBool, is_online: AtomicBool,
@ -218,10 +218,7 @@ impl fmt::Debug for StorageManager {
) )
.field("anonymous_watch_keys", &self.anonymous_watch_keys) .field("anonymous_watch_keys", &self.anonymous_watch_keys)
.field("is_online", &self.is_online) .field("is_online", &self.is_online)
.field( .field("descriptor_cache", &self.descriptor_cache)
"set_value_descriptor_cache",
&self.set_value_descriptor_cache,
)
.finish() .finish()
} }
} }
@ -279,9 +276,7 @@ impl StorageManager {
anonymous_watch_keys, anonymous_watch_keys,
background_operation_processor: DeferredStreamProcessor::new(), background_operation_processor: DeferredStreamProcessor::new(),
is_online: AtomicBool::new(false), is_online: AtomicBool::new(false),
set_value_descriptor_cache: Arc::new(Mutex::new(LruCache::new( descriptor_cache: Arc::new(Mutex::new(LruCache::new(DESCRIPTOR_CACHE_SIZE))),
SET_VALUE_DESCRIPTOR_CACHE_SIZE,
))),
}; };
this.setup_tasks(); this.setup_tasks();
@ -450,11 +445,11 @@ impl StorageManager {
if let Some(metadata_db) = &inner.metadata_db { if let Some(metadata_db) = &inner.metadata_db {
let tx = metadata_db.transact(); let tx = metadata_db.transact();
let set_value_descriptor_cache = self let set_value_descriptor_cache = self
.set_value_descriptor_cache .descriptor_cache
.lock() .lock()
.iter() .iter()
.map(|x| x.0.clone()) .map(|x| x.0.clone())
.collect::<Vec<SetValueDescriptorCacheKey>>(); .collect::<Vec<DescriptorCacheKey>>();
tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?; tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?;
tx.store_json(0, OUTBOUND_WATCH_MANAGER, &inner.outbound_watch_manager)?; tx.store_json(0, OUTBOUND_WATCH_MANAGER, &inner.outbound_watch_manager)?;
@ -521,7 +516,7 @@ impl StorageManager {
} }
}; };
let set_value_descriptor_cache_keys = match metadata_db let set_value_descriptor_cache_keys = match metadata_db
.load_json::<Vec<SetValueDescriptorCacheKey>>(0, SET_VALUE_DESCRIPTOR_CACHE) .load_json::<Vec<DescriptorCacheKey>>(0, SET_VALUE_DESCRIPTOR_CACHE)
.await .await
{ {
Ok(v) => v.unwrap_or_default(), Ok(v) => v.unwrap_or_default(),
@ -533,7 +528,7 @@ impl StorageManager {
} }
}; };
{ {
let mut set_value_descriptor_cache = self.set_value_descriptor_cache.lock(); let mut set_value_descriptor_cache = self.descriptor_cache.lock();
set_value_descriptor_cache.clear(); set_value_descriptor_cache.clear();
for k in set_value_descriptor_cache_keys { for k in set_value_descriptor_cache_keys {
set_value_descriptor_cache.insert(k, ()); set_value_descriptor_cache.insert(k, ());

View file

@ -1,20 +1,26 @@
mod outbound_transaction_per_node_state; mod outbound_transaction_per_node_state;
mod outbound_transaction_state; mod outbound_transaction_state;
use crate::storage_manager::transact_value::OutboundTransactionHandle;
use super::*; use super::*;
use outbound_transaction_per_node_state::*; use outbound_transaction_per_node_state::*;
use outbound_transaction_state::*; use outbound_transaction_state::*;
use serde_with::serde_as; use serde_with::serde_as;
pub(in crate::storage_manager) use outbound_transaction_state::OutboundTransactionRecordInfo;
impl_veilid_log_facility!("stor"); impl_veilid_log_facility!("stor");
#[serde_as] #[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundTransactionManager { pub(in crate::storage_manager) struct OutboundTransactionManager {
/// Record key to handle map
pub handles_by_key: HashMap<OpaqueRecordKey, OutboundTransactionHandle>,
/// Each transaction per record key /// Each transaction per record key
#[serde(skip)] #[serde(skip)]
pub transactions: HashMap<RecordKey, Arc<OutboundTransactionState>>, pub transactions: HashMap<OutboundTransactionHandle, OutboundTransactionState>,
/// Last known transaction per node+record /// Last known transaction per node+record
#[serde_as(as = "Vec<(_, _)>")] #[serde_as(as = "Vec<(_, _)>")]
pub per_node_states: HashMap<PerNodeKey, OutboundTransactionPerNodeState>, pub per_node_states: HashMap<PerNodeKey, OutboundTransactionPerNodeState>,
@ -29,7 +35,7 @@ impl fmt::Display for OutboundTransactionManager {
for k in keys { for k in keys {
let v = self.transactions.get(&k).unwrap(); let v = self.transactions.get(&k).unwrap();
out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string())); out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.lock().to_string()));
} }
} }
out += "]\n"; out += "]\n";
@ -58,6 +64,7 @@ impl Default for OutboundTransactionManager {
impl OutboundTransactionManager { impl OutboundTransactionManager {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
handles_by_key: HashMap::new(),
transactions: HashMap::new(), transactions: HashMap::new(),
per_node_states: HashMap::new(), per_node_states: HashMap::new(),
} }
@ -85,4 +92,48 @@ impl OutboundTransactionManager {
// }) // })
// } // }
} }
pub fn new_transaction(
&mut self,
record_infos: &[OutboundTransactionRecordInfo],
member: KeyPair,
safety_selection: SafetySelection,
) -> VeilidAPIResult<OutboundTransactionHandle> {
// Ensure no other transactions are using any of these record keys and make handle
let mut opaque_record_keys = vec![];
for ri in record_infos {
let opaque_record_key = ri.record_key.opaque();
if self.handles_by_key.contains_key(&opaque_record_key) {
apibail_generic!(format!(
"Record {} already has a a transaction open",
opaque_record_key
));
}
opaque_record_keys.push(opaque_record_key);
}
let transaction_handle = OutboundTransactionHandle {
keys: Arc::new(opaque_record_keys.clone()),
};
// Create a new outbound transaction state
let outbound_transaction_state =
OutboundTransactionState::new(record_infos.to_vec(), member, safety_selection);
// Add to transaction list
for opaque_record_key in opaque_record_keys {
self.handles_by_key
.insert(opaque_record_key, transaction_handle.clone());
}
self.transactions
.insert(transaction_handle, outbound_transaction_state);
// Success, return the transaction handle
Ok(transaction_handle)
}
pub fn get_begin_params(
&self,
transaction_handle: OutboundTransactionHandle,
) -> OutboundTransactionBeginParams {
}
} }

View file

@ -1,9 +1,17 @@
use super::*; use super::*;
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub(in crate::storage_manager) struct OutboundTransactionRecordInfo {
pub record_key: RecordKey,
pub writer: KeyPair,
}
//#[serde_as] //#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub(in crate::storage_manager) struct OutboundTransactionState { pub(in crate::storage_manager) struct OutboundTransactionState {
// record_infos: Vec<OutboundTransactionRecordInfo>,
member: KeyPair,
safety_selection: SafetySelection,
} }
impl fmt::Display for OutboundTransactionState { impl fmt::Display for OutboundTransactionState {
@ -47,3 +55,17 @@ impl fmt::Display for OutboundTransactionState {
// ) // )
} }
} }
impl OutboundTransactionState {
pub fn new(
record_infos: Vec<OutboundTransactionRecordInfo>,
member: KeyPair,
safety_selection: SafetySelection,
) -> Self {
Self {
record_infos,
member,
safety_selection,
}
}
}

View file

@ -380,7 +380,7 @@ impl StorageManager {
// Make the return channel // Make the return channel
let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundSetValueResult>>(); let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundSetValueResult>>();
// Make do-set-value answer context // Make operation context
let schema = descriptor.schema()?; let schema = descriptor.schema()?;
let context = Arc::new(Mutex::new(OutboundSetValueContext { let context = Arc::new(Mutex::new(OutboundSetValueContext {
value, value,
@ -388,7 +388,7 @@ impl StorageManager {
schema, schema,
send_partial_update: true, send_partial_update: true,
})); }));
let set_value_descriptor_cache = self.set_value_descriptor_cache.clone(); let descriptor_cache = self.descriptor_cache.clone();
// Routine to call to generate fanout // Routine to call to generate fanout
let call_routine = { let call_routine = {
@ -396,7 +396,7 @@ impl StorageManager {
let registry = self.registry(); let registry = self.registry();
let opaque_record_key = opaque_record_key.clone(); let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone(); let safety_selection = safety_selection.clone();
let set_value_descriptor_cache = set_value_descriptor_cache.clone(); let descriptor_cache = descriptor_cache.clone();
Arc::new( Arc::new(
move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> { move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> {
@ -405,14 +405,14 @@ impl StorageManager {
let descriptor = descriptor.clone(); let descriptor = descriptor.clone();
let opaque_record_key = opaque_record_key.clone(); let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone(); let safety_selection = safety_selection.clone();
let set_value_descriptor_cache = set_value_descriptor_cache.clone(); let descriptor_cache = descriptor_cache.clone();
Box::pin(async move { Box::pin(async move {
let rpc_processor = registry.rpc_processor(); let rpc_processor = registry.rpc_processor();
// check the cache to see if we should send the descriptor // check the cache to see if we should send the descriptor
let node_id = next_node.node_ids().get(opaque_record_key.kind()).unwrap(); let node_id = next_node.node_ids().get(opaque_record_key.kind()).unwrap();
let svdc_key = SetValueDescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id }; let dc_key = DescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id };
let mut send_descriptor = set_value_descriptor_cache.lock().get(&svdc_key).is_none(); let mut send_descriptor = descriptor_cache.lock().get(&dc_key).is_none();
// get most recent value to send // get most recent value to send
let value = { let value = {
@ -478,7 +478,7 @@ impl StorageManager {
// Cache if we sent the descriptor // Cache if we sent the descriptor
if send_descriptor { if send_descriptor {
set_value_descriptor_cache.lock().insert(svdc_key,()); descriptor_cache.lock().insert(dc_key,());
} }
// See if we got a newer value back // See if we got a newer value back

View file

@ -2,55 +2,40 @@ use super::*;
impl_veilid_log_facility!("stor"); impl_veilid_log_facility!("stor");
// /// The fully parsed descriptor /// Info tracked per subkey
// struct DescriptorInfo { struct SubkeySeqCount {
// /// The descriptor itself /// The newest sequence number found for a subkey
// descriptor: Arc<SignedValueDescriptor>, pub seq: Option<ValueSeqNum>,
/// The set of nodes that had the most recent value for this subkey
// /// The in-schema subkeys that overlap the inspected range pub consensus_nodes: Vec<NodeRef>,
// subkeys: ValueSubkeyRangeSet,
// }
// impl DescriptorInfo {
// pub fn new(
// descriptor: Arc<SignedValueDescriptor>,
// subkeys: &ValueSubkeyRangeSet,
// ) -> VeilidAPIResult<Self> {
// let schema = descriptor.schema().map_err(RPCError::invalid_format)?;
// let subkeys = schema.truncate_subkeys(subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN));
// Ok(Self {
// descriptor,
// subkeys,
// })
// }
// // }
// /// Info tracked per subkey
// struct SubkeySeqCount {
// /// The newest sequence number found for a subkey
// pub seq: Option<ValueSeqNum>,
// /// The set of nodes that had the most recent value for this subkey
// pub consensus_nodes: Vec<NodeRef>,
// /// The set of nodes that had any value for this subkey
// pub value_nodes: Vec<NodeRef>,
// }
/// The context of the outbound_transact_value operation
struct OutboundTransactValueContext {
// /// The combined sequence numbers and result counts so far
// pub seqcounts: Vec<SubkeySeqCount>,
// /// The descriptor if we got a fresh one or empty if no descriptor was needed
// pub opt_descriptor_info: Option<DescriptorInfo>,
} }
#[derive(Clone, Debug, Hash, PartialEq, Eq)] /// Transaction ids and nodes
struct NodeTransactions {
pub node_ref: NodeRef,
pub xid: u64,
}
/// The context of the outbound_begin_transact_value operation
struct OutboundBeginTransactValueContext {
/// The combined sequence numbers and result counts so far
pub seqcounts: Vec<SubkeySeqCount>,
/// The descriptor for this record
pub descriptor: Arc<SignedValueDescriptor>,
/// The number of non-accept since the last accept we have received
pub missed_since_last_accept: usize,
/// The set of nodes that returned a transaction id
pub xid_nodes: Vec<NodeTransactions>,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct OutboundTransactionHandle { pub struct OutboundTransactionHandle {
keys: Arc<Vec<OpaqueRecordKey>> pub keys: Arc<Vec<OpaqueRecordKey>>,
} }
/// The result of the outbound_transact_value operation /// The result of the outbound_transact_value operation
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) struct OutboundTransactValueBeginResult { pub(super) struct OutboundBeginTransactValueResult {
/// Fanout results for each subkey /// Fanout results for each subkey
pub fanout_result: FanoutResult, pub fanout_result: FanoutResult,
/// The transactions that were retrieved /// The transactions that were retrieved
@ -67,8 +52,6 @@ pub(crate) enum InboundTransactValueResult {
} }
impl StorageManager { impl StorageManager {
/// Create a new transaction over a set of records /// Create a new transaction over a set of records
/// If an existing transaction exists over these records /// If an existing transaction exists over these records
/// or a transaction can not be performed at this time, this will fail. /// or a transaction can not be performed at this time, this will fail.
@ -78,20 +61,80 @@ impl StorageManager {
pub async fn transact_records( pub async fn transact_records(
&self, &self,
record_keys: Vec<RecordKey>, record_keys: Vec<RecordKey>,
safety_selection: SafetySelection,
options: Option<TransactDHTRecordsOptions>, options: Option<TransactDHTRecordsOptions>,
) -> VeilidAPIResult<OutboundTransactionHandle> { ) -> VeilidAPIResult<OutboundTransactionHandle> {
let Ok(_guard) = self.startup_lock.enter() else { let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!(); apibail_not_initialized!();
}; };
// Obtain the outbound transaction manager // Early rejection if dht is not online
let otm = self.inner.lock().await; if !self.dht_is_online() {
otm. apibail_try_again!("dht is not online");
}
xxx write out document about offline-first dht, staged writes, and how to deal with queued transactions // Resolve options
let options = options.unwrap_or_default();
self.watch_values_inner(watch_lock, subkeys, expiration, count) // Get opened records and construct record infos
.await let (transaction_handle, begin_params_list) = {
let mut inner = self.inner.lock().await;
let mut member = options.member.clone();
let mut record_infos = vec![];
for record_key in record_keys {
let opaque_record_key = record_key.opaque();
let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else {
apibail_generic!(format!("record key not open: {}", opaque_record_key));
};
if record_key.encryption_key().map(|x| x.ref_value())
!= opened_record.encryption_key()
{
apibail_generic!(format!(
"record encryption key does not match opened record encryption key: {}",
opaque_record_key
));
}
// Add to record infos
let Some(writer) = opened_record.writer().cloned() else {
apibail_generic!(format!(
"record not opened for writing: {}",
opaque_record_key
));
};
record_infos.push(OutboundTransactionRecordInfo { record_key, writer });
// Choose first opened record writer as 'member' if we don't have one yet
if member.is_none() {
if let Some(writer) = opened_record.writer() {
member = Some(writer.clone());
}
}
}
// If we have no transaction member to verify the transaction, fail out
let Some(member) = member else {
apibail_generic!(
"no record keys opened for writing and no transaction member specified"
);
};
// Obtain the outbound transaction manager
let otm = &mut inner.outbound_transaction_manager;
// Create a new transaction if possible
let transaction_handle =
otm.new_transaction(&record_infos, member, safety_selection)?;
// Get parameters for beginning a transaction
let begin_params_list = otm.get_begin_params(transaction_handle.clone());
(transaction_handle, begin_params_list)
};
// Begin transactions on all records
for begin_params in begin_params_list {
//
}
} }
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
@ -102,11 +145,10 @@ impl StorageManager {
pub(super) async fn outbound_begin_transact_value( pub(super) async fn outbound_begin_transact_value(
&self, &self,
opaque_record_key: OpaqueRecordKey, opaque_record_key: OpaqueRecordKey,
descriptor: Option<SignedValueDescriptor,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> VeilidAPIResult<OutboundTransactValueResult> { writer: KeyPair,
) -> VeilidAPIResult<OutboundBeginTransactValueResult> {
let routing_domain = RoutingDomain::PublicInternet; let routing_domain = RoutingDomain::PublicInternet;
let requested_subkeys = subkeys.clone();
// Get the DHT parameters for 'TransactValue' // Get the DHT parameters for 'TransactValue'
let config = self.config(); let config = self.config();
@ -131,26 +173,32 @@ impl StorageManager {
.collect() .collect()
}; };
// Make do-inspect-value answer context // Pull the descriptor for this record
let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() { let descriptor = {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network. let mut inner = self.inner.lock().await;
Some(DescriptorInfo::new(descriptor, &subkeys)?) let local_inspect_result = self
} else { .handle_inspect_local_value_inner(
None &mut inner,
opaque_record_key.clone(),
ValueSubkeyRangeSet::full(),
true,
)
.await?;
local_inspect_result.opt_descriptor().unwrap()
}; };
let context = Arc::new(Mutex::new(OutboundInspectValueContext { let schema = descriptor.schema()?;
seqcounts: local_inspect_result let subkey_count =
.seqs() usize::try_from(schema.max_subkey() + 1).map_err(VeilidAPIError::internal)?;
.iter()
.map(|s| SubkeySeqCount { // Make operation context
seq: *s, let context = Arc::new(Mutex::new(OutboundBeginTransactValueContext {
consensus_nodes: vec![], descriptor,
value_nodes: vec![], missed_since_last_accept: 0,
}) seqcounts: vec![],
.collect(), xid_nodes: vec![],
opt_descriptor_info,
})); }));
let descriptor_cache = self.descriptor_cache.clone();
// Routine to call to generate fanout // Routine to call to generate fanout
let call_routine = { let call_routine = {
@ -158,87 +206,103 @@ impl StorageManager {
let registry = self.registry(); let registry = self.registry();
let opaque_record_key = opaque_record_key.clone(); let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone(); let safety_selection = safety_selection.clone();
let descriptor_cache = descriptor_cache.clone();
Arc::new( Arc::new(
move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> { move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> {
let context = context.clone(); let context = context.clone();
let registry = registry.clone(); let registry = registry.clone();
let opt_descriptor = local_inspect_result.opt_descriptor();
let subkeys = subkeys.clone();
let opaque_record_key = opaque_record_key.clone(); let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone(); let safety_selection = safety_selection.clone();
let descriptor_cache = descriptor_cache.clone();
Box::pin(async move { Box::pin(async move {
let rpc_processor = registry.rpc_processor(); let rpc_processor = registry.rpc_processor();
let iva = match // check the cache to see if we should send the descriptor
let node_id = next_node.node_ids().get(opaque_record_key.kind()).unwrap();
let dc_key = DescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id };
let mut send_descriptor = descriptor_cache.lock().get(&dc_key).is_none();
// send across the wire, with a retry if the remote needed the descriptor
let tva = loop {
// send across the wire
let tva = match
rpc_processor rpc_processor
.rpc_call_inspect_value( .rpc_call_transact_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection), Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection),
opaque_record_key.clone(), opaque_record_key.clone(),
subkeys.clone(), None,
opt_descriptor.map(|x| (*x).clone()), TransactValueCommand::Begin,
descriptor.as_ref().clone(),
send_descriptor,
writer,
) )
.await? { .await? {
NetworkResult::Timeout => { NetworkResult::Timeout => {
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout}); return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout});
}
NetworkResult::ServiceUnavailable(_) |
NetworkResult::NoConnection(_) |
NetworkResult::AlreadyExists(_) |
NetworkResult::InvalidMessage(_) => {
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid});
}
NetworkResult::Value(v) => v
};
// Do a retry if we needed to send the descriptor
// (if the cache was wrong)
if tva.answer.accepted {
if tva.answer.needs_descriptor {
if !send_descriptor {
send_descriptor = true;
continue;
} else {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key);
}
}
} else if tva.answer.needs_descriptor {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key);
} }
NetworkResult::ServiceUnavailable(_) |
NetworkResult::NoConnection(_) | break tva;
NetworkResult::AlreadyExists(_) |
NetworkResult::InvalidMessage(_) => {
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid});
}
NetworkResult::Value(v) => v
}; };
let answer = iva.answer; let answer = tva.answer;
// Keep the descriptor if we got one. If we had a last_descriptor it will // If the node was close enough to accept the value
// already be validated by rpc_call_inspect_value
if let Some(descriptor) = answer.descriptor {
let mut ctx = context.lock();
if ctx.opt_descriptor_info.is_none() {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
let descriptor_info =
match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) {
Ok(v) => v,
Err(e) => {
veilid_log!(registry debug target:"network_result", "InspectValue returned an invalid descriptor: {}", e);
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid});
}
};
ctx.opt_descriptor_info = Some(descriptor_info);
}
}
// Keep the value if we got one and it is newer and it passes schema validation
if answer.seqs.is_empty() {
veilid_log!(registry debug target:"network_result", "InspectValue returned no seq, fanout call returned peers {}", answer.peers.len());
return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Rejected});
}
veilid_log!(registry debug target:"network_result", "Got seqs back: len={}", answer.seqs.len());
let mut ctx = context.lock(); let mut ctx = context.lock();
if !tva.answer.accepted {
ctx.missed_since_last_accept += 1;
// Ensure we have a schema and descriptor etc // Return peers if we have some
let Some(descriptor_info) = &ctx.opt_descriptor_info else { veilid_log!(registry debug target:"network_result", "BeginTransactValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_accept, tva.answer.peers.len());
// Got a value but no descriptor for it return Ok(FanoutCallOutput{peer_info_list:tva.answer.peers, disposition: FanoutCallDisposition::Rejected});
// Move to the next node }
veilid_log!(registry debug target:"network_result", "InspectValue returned a value with no descriptor invalid descriptor");
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); // Cache if we sent the descriptor
if send_descriptor {
descriptor_cache.lock().insert(dc_key,());
}
// Get the transaction id
let Some(xid) = answer.transaction_id else {
veilid_log!(registry debug target:"network_result", "BeginTransactValue returned no transaction id, fanout call returned peers {}", answer.peers.len());
return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Rejected});
}; };
// Get number of subkeys from schema and ensure we are getting the // Get the sequence number state at the point of the transaction
// right number of sequence numbers betwen that and what we asked for if answer.seqs.len() != subkey_count {
#[allow(clippy::unnecessary_cast)]
if answer.seqs.len() as u64 != descriptor_info.subkeys.len() as u64 {
// Not the right number of sequence numbers
// Move to the next node
veilid_log!(registry debug target:"network_result", "wrong number of seqs returned {} (wanted {})", veilid_log!(registry debug target:"network_result", "wrong number of seqs returned {} (wanted {})",
answer.seqs.len(), answer.seqs.len(),
descriptor_info.subkeys.len()); subkey_count);
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Invalid});
} }
veilid_log!(registry debug target:"network_result", "Got transaction id and seqs back: xid={}, len={}", xid, answer.seqs.len());
// Add transaction id node to list
ctx.xid_nodes.push(NodeTransactions { node_ref: next_node.clone(), xid });
// If we have a prior seqs list, merge in the new seqs // If we have a prior seqs list, merge in the new seqs
if ctx.seqcounts.is_empty() { if ctx.seqcounts.is_empty() {
ctx.seqcounts = answer ctx.seqcounts = answer
@ -248,17 +312,9 @@ impl StorageManager {
seq: *s, seq: *s,
// One node has shown us the newest sequence numbers so far // One node has shown us the newest sequence numbers so far
consensus_nodes: vec![next_node.clone()], consensus_nodes: vec![next_node.clone()],
value_nodes: vec![next_node.clone()],
}) })
.collect(); .collect();
} else { } else {
if ctx.seqcounts.len() != answer.seqs.len() {
veilid_log!(registry debug target:"network_result", "seqs list length should always be equal by now: {} (wanted {})",
answer.seqs.len(),
ctx.seqcounts.len());
return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid});
}
for pair in ctx.seqcounts.iter_mut().zip(answer.seqs.iter()) { for pair in ctx.seqcounts.iter_mut().zip(answer.seqs.iter()) {
let ctx_seqcnt = pair.0; let ctx_seqcnt = pair.0;
let answer_seq = *pair.1; let answer_seq = *pair.1;
@ -285,23 +341,21 @@ impl StorageManager {
ctx_seqcnt.consensus_nodes.push(next_node.clone()); ctx_seqcnt.consensus_nodes.push(next_node.clone());
} }
} }
ctx_seqcnt.value_nodes.push(next_node.clone());
} }
} }
// Return peers if we have some // Return peers if we have some
veilid_log!(registry debug target:"network_result", "InspectValue fanout call returned peers {}", answer.peers.len()); veilid_log!(registry debug target:"network_result", "BeginTransactValue fanout call returned peers {}", answer.peers.len());
// Inspect doesn't actually use the fanout queue consensus tracker // Transact doesn't actually use the fanout queue consensus tracker
Ok(FanoutCallOutput { peer_info_list: answer.peers, disposition: FanoutCallDisposition::Accepted}) Ok(FanoutCallOutput { peer_info_list: answer.peers, disposition: FanoutCallDisposition::Accepted})
}.instrument(tracing::trace_span!("outbound_inspect_value fanout call"))) as PinBoxFuture<FanoutCallResult> }.instrument(tracing::trace_span!("outbound_begin_transact_value fanout call"))) as PinBoxFuture<FanoutCallResult>
}, },
) )
}; };
// Routine to call to check if we're done at each step // Routine to call to check if we're done at each step
// For inspect, we are tracking consensus externally from the FanoutCall, // For transact, we are tracking consensus externally from the FanoutCall,
// for each subkey, rather than a single consensus, so the single fanoutresult // for each subkey, rather than a single consensus, so the single fanoutresult
// that is passed in here is ignored in favor of our own per-subkey tracking // that is passed in here is ignored in favor of our own per-subkey tracking
let check_done = { let check_done = {
@ -317,7 +371,7 @@ impl StorageManager {
} }
} }
!ctx.seqcounts.is_empty() && ctx.opt_descriptor_info.is_some() && has_consensus !ctx.seqcounts.is_empty() && has_consensus
}) })
}; };
@ -354,14 +408,14 @@ impl StorageManager {
} }
if subkey_fanout_results.len() == 1 { if subkey_fanout_results.len() == 1 {
veilid_log!(self debug "InspectValue Fanout: {:#}\n{:#}", fanout_result, subkey_fanout_results.first().unwrap()); veilid_log!(self debug "BeginTransactValue Fanout: {:#}\n{:#}", fanout_result, subkey_fanout_results.first().unwrap());
} else { } else {
veilid_log!(self debug "InspectValue Fanout: {:#}:\n{}", fanout_result, debug_fanout_results(&subkey_fanout_results)); veilid_log!(self debug "BeginTransactValue Fanout: {:#}:\n{}", fanout_result, debug_fanout_results(&subkey_fanout_results));
} }
let result = OutboundInspectValueResult { let result = OutboundBeginTransactValueResult {
subkey_fanout_results, subkey_fanout_results,
inspect_result: InspectResult::new( transact_result: BeginTransactResult::new(
self, self,
requested_subkeys, requested_subkeys,
"outbound_inspect_value", "outbound_inspect_value",
@ -389,14 +443,12 @@ impl StorageManager {
Ok(result) Ok(result)
} }
/// Perform end transaction queries on the network for a single record /// Perform end transaction queries on the network for a single record
#[instrument(level = "trace", target = "dht", skip_all, err)] #[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_end_transact_value( pub(super) async fn outbound_end_transact_value(
&self, &self,
opaque_record_key: OpaqueRecordKey, opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<OutboundTransactValueResult> { ) -> VeilidAPIResult<OutboundTransactValueResult> {
} }
/// Perform commit transaction queries on the network for a single record /// Perform commit transaction queries on the network for a single record
@ -405,7 +457,6 @@ impl StorageManager {
&self, &self,
opaque_record_key: OpaqueRecordKey, opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<OutboundTransactValueResult> { ) -> VeilidAPIResult<OutboundTransactValueResult> {
} }
/// Perform rollback transaction queries on the network for a single record /// Perform rollback transaction queries on the network for a single record
@ -414,7 +465,6 @@ impl StorageManager {
&self, &self,
opaque_record_key: OpaqueRecordKey, opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<OutboundTransactValueResult> { ) -> VeilidAPIResult<OutboundTransactValueResult> {
} }
/// Handle a received 'TransactValue' query /// Handle a received 'TransactValue' query
@ -429,7 +479,8 @@ impl StorageManager {
) -> VeilidAPIResult<NetworkResult<InboundTransactValueResult>> { ) -> VeilidAPIResult<NetworkResult<InboundTransactValueResult>> {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Ok(NetworkResult::value(InboundTransactValueResult::Success(
Ok(NetworkResult::value(InboundTransactValueResult::Success(transact_result))) transact_result,
)))
} }
} }

View file

@ -403,7 +403,7 @@ impl StorageManager {
.collect() .collect()
}; };
// Make do-watch-value answer context // Make operation context
let context = Arc::new(Mutex::new(OutboundWatchValueContext::default())); let context = Arc::new(Mutex::new(OutboundWatchValueContext::default()));
// Routine to call to generate fanout // Routine to call to generate fanout

View file

@ -1,4 +1,5 @@
use super::*; use super::*;
use crate::storage_manager::OutboundTransactionHandle;
impl_veilid_log_facility!("veilid_api"); impl_veilid_log_facility!("veilid_api");
@ -11,6 +12,8 @@ pub(crate) struct DHTTransactionUnlockedInner {}
/// DHT operations performed out of a transaction may be processed in any order, and only operate on one subkey at a time /// DHT operations performed out of a transaction may be processed in any order, and only operate on one subkey at a time
/// for a given record. Transactions allow you to bind a set of operations so they all succeed, or fail together, and at the same time. /// for a given record. Transactions allow you to bind a set of operations so they all succeed, or fail together, and at the same time.
/// ///
/// Transactional DHT operations can only be performed when the node is online, and will error with [VeilidAPIError::TryAgain] if offline.
///
/// Transactions must be committed when all of their operations are registered, or rolled back if the group of operations is to be cancelled. /// Transactions must be committed when all of their operations are registered, or rolled back if the group of operations is to be cancelled.
#[derive(Clone)] #[derive(Clone)]
#[must_use] #[must_use]
@ -18,7 +21,7 @@ pub struct DHTTransaction {
/// Routing context in use /// Routing context in use
routing_context: RoutingContext, routing_context: RoutingContext,
/// Immutable State /// Immutable State
unlocked_inner: Arc<RoutingContextUnlockedInner>, unlocked_inner: Arc<DHTTransactionUnlockedInner>,
} }
impl fmt::Debug for DHTTransaction { impl fmt::Debug for DHTTransaction {
@ -37,7 +40,7 @@ impl DHTTransaction {
routing_context: RoutingContext, routing_context: RoutingContext,
handle: OutboundTransactionHandle, handle: OutboundTransactionHandle,
) -> VeilidAPIResult<Self> { ) -> VeilidAPIResult<Self> {
let config = routing_context.api().api.config()?; let config = routing_context.api().config()?;
Ok(Self { Ok(Self {
routing_context, routing_context,

View file

@ -674,7 +674,12 @@ impl RoutingContext {
} }
let storage_manager = self.api.core_context()?.storage_manager(); let storage_manager = self.api.core_context()?.storage_manager();
let handle = Box::pin(storage_manager.transact_records(record_keys, options)).await; let handle = Box::pin(storage_manager.transact_records(
record_keys,
self.unlocked_inner.safety_selection.clone(),
options,
))
.await?;
match DHTTransaction::try_new(self.clone(), handle) { match DHTTransaction::try_new(self.clone(), handle) {
Ok(v) => Ok(v), Ok(v) => Ok(v),

View file

@ -1,20 +1,19 @@
use super::*; use super::*;
/// Options controlling the
#[derive(Debug, JsonSchema, Serialize, Deserialize, Clone)] #[derive(Debug, JsonSchema, Serialize, Deserialize, Clone)]
pub struct TransactDHTRecordsOptions { pub struct TransactDHTRecordsOptions {
/// The schema member public key to use when opening the transcation.
/// Setting this does not override any writer keys used by transaction operations
/// and is only used to determine access to the transaction by validating the member
/// is in the schema.
#[schemars(with = "Option<String>")] #[schemars(with = "Option<String>")]
pub writer: Option<KeyPair>, pub member: Option<KeyPair>,
/// Defaults to true. If false, the transaction will not be created or committed if the node is offline,
/// and a TryAgain error will be returned.
pub allow_offline: Option<AllowOffline>,
} }
impl Default for TransactDHTRecordsOptions { impl Default for TransactDHTRecordsOptions {
fn default() -> Self { fn default() -> Self {
Self { Self { member: None }
writer: None,
allow_offline: Some(AllowOffline(true)),
}
} }
} }
@ -27,10 +26,7 @@ pub mod ts {
pub struct TransactDHTRecordsOptions { pub struct TransactDHTRecordsOptions {
#[tsify(type = "KeyPair", optional)] #[tsify(type = "KeyPair", optional)]
#[serde(with = "serde_wasm_bindgen::preserve")] #[serde(with = "serde_wasm_bindgen::preserve")]
pub writer: JsValue, pub member: JsValue,
/// Defaults to true. If false, the transaction will not be created or committed if the node is offline,
/// and a TryAgain error will be returned.
pub allow_offline: Option<AllowOffline>,
} }
} }
@ -39,12 +35,8 @@ impl TryFrom<ts::TransactDHTRecordsOptions> for TransactDHTRecordsOptions {
type Error = VeilidAPIError; type Error = VeilidAPIError;
fn try_from(value: ts::TransactDHTRecordsOptions) -> Result<Self, Self::Error> { fn try_from(value: ts::TransactDHTRecordsOptions) -> Result<Self, Self::Error> {
let writer = wasm_bindgen_derive::try_from_js_option::<KeyPair>(value.writer) let member = wasm_bindgen_derive::try_from_js_option::<KeyPair>(value.member)
.map_err(VeilidAPIError::generic)?; .map_err(VeilidAPIError::generic)?;
let allow_offline = value.allow_offline.clone(); Ok(TransactDHTRecordsOptions { member })
Ok(TransactDHTRecordsOptions {
writer,
allow_offline,
})
} }
} }