From b16d0a528719326cf729f47ccc3c07c9eed42cf6 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 3 Oct 2025 14:56:35 -0400 Subject: [PATCH] checkpoint --- veilid-core/src/storage_manager/get_value.rs | 2 +- .../src/storage_manager/inspect_value.rs | 2 +- veilid-core/src/storage_manager/mod.rs | 29 +- .../outbound_transaction_manager/mod.rs | 55 ++- .../outbound_transaction_state.rs | 24 +- veilid-core/src/storage_manager/set_value.rs | 14 +- .../src/storage_manager/transact_value.rs | 351 ++++++++++-------- .../src/storage_manager/watch_value.rs | 2 +- veilid-core/src/veilid_api/dht_transaction.rs | 7 +- veilid-core/src/veilid_api/routing_context.rs | 7 +- .../types/dht/transact_dht_records_options.rs | 28 +- 11 files changed, 320 insertions(+), 201 deletions(-) diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 6783c1fe..32b8f447 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -205,7 +205,7 @@ impl StorageManager { // Make the return channel let (out_tx, out_rx) = flume::unbounded::>(); - // Make do-get-value answer context + // Make operation context let context = Arc::new(Mutex::new(OutboundGetValueContext { value: last_get_result.opt_value, descriptor: last_get_result.opt_descriptor.clone(), diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 006b7abd..8e422f83 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -251,7 +251,7 @@ impl StorageManager { .collect() }; - // Make do-inspect-value answer context + // Make operation context let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() { // Get the descriptor info. This also truncates the subkeys list to what can be returned from the network. Some(DescriptorInfo::new(descriptor, &subkeys)?) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 1ac01957..fd9a6ddd 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -39,7 +39,7 @@ pub(crate) use get_value::InboundGetValueResult; pub(crate) use inspect_value::InboundInspectValueResult; pub(crate) use record_store::{InboundWatchParameters, InboundWatchValueResult}; pub(crate) use set_value::InboundSetValueResult; -pub(crate) use transact_value::InboundTransactValueResult; +pub(crate) use transact_value::{InboundTransactValueResult, OutboundTransactionHandle}; pub use types::*; impl_veilid_log_facility!("stor"); @@ -76,8 +76,8 @@ const REHYDRATE_RECORDS_INTERVAL_SECS: u32 = 1; const REHYDRATE_BATCH_SIZE: usize = 16; /// Maximum 'offline lag' before we decide to poll for changed watches const CHANGE_INSPECT_LAG_SECS: u32 = 2; -/// Length of set value descriptor cache (512 records and 5 nodes per record, roughly 184320 bytes) -const SET_VALUE_DESCRIPTOR_CACHE_SIZE: usize = 2560; +/// Length of descriptor cache (512 records and 5 nodes per record, roughly 184320 bytes) +const DESCRIPTOR_CACHE_SIZE: usize = 2560; /// Table store table for storage manager metadata const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; /// Storage manager metadata key name for offline subkey write persistence @@ -104,7 +104,7 @@ struct ValueChangedInfo { #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] /// A single 'value changed' message to send -struct SetValueDescriptorCacheKey { +struct DescriptorCacheKey { opaque_record_key: OpaqueRecordKey, node_id: NodeId, } @@ -188,8 +188,8 @@ pub(crate) struct StorageManager { // background operations the storage manager wants to perform background_operation_processor: DeferredStreamProcessor, - /// Cache of which nodes have seen descriptors for which records to optimize outbound_set_value - set_value_descriptor_cache: Arc>>, + /// Cache of which nodes have seen descriptors for which records to optimize outbound set_value and transact_value operations + descriptor_cache: Arc>>, // Online check is_online: AtomicBool, @@ -218,10 +218,7 @@ impl fmt::Debug for StorageManager { ) .field("anonymous_watch_keys", &self.anonymous_watch_keys) .field("is_online", &self.is_online) - .field( - "set_value_descriptor_cache", - &self.set_value_descriptor_cache, - ) + .field("descriptor_cache", &self.descriptor_cache) .finish() } } @@ -279,9 +276,7 @@ impl StorageManager { anonymous_watch_keys, background_operation_processor: DeferredStreamProcessor::new(), is_online: AtomicBool::new(false), - set_value_descriptor_cache: Arc::new(Mutex::new(LruCache::new( - SET_VALUE_DESCRIPTOR_CACHE_SIZE, - ))), + descriptor_cache: Arc::new(Mutex::new(LruCache::new(DESCRIPTOR_CACHE_SIZE))), }; this.setup_tasks(); @@ -450,11 +445,11 @@ impl StorageManager { if let Some(metadata_db) = &inner.metadata_db { let tx = metadata_db.transact(); let set_value_descriptor_cache = self - .set_value_descriptor_cache + .descriptor_cache .lock() .iter() .map(|x| x.0.clone()) - .collect::>(); + .collect::>(); tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?; tx.store_json(0, OUTBOUND_WATCH_MANAGER, &inner.outbound_watch_manager)?; @@ -521,7 +516,7 @@ impl StorageManager { } }; let set_value_descriptor_cache_keys = match metadata_db - .load_json::>(0, SET_VALUE_DESCRIPTOR_CACHE) + .load_json::>(0, SET_VALUE_DESCRIPTOR_CACHE) .await { Ok(v) => v.unwrap_or_default(), @@ -533,7 +528,7 @@ impl StorageManager { } }; { - let mut set_value_descriptor_cache = self.set_value_descriptor_cache.lock(); + let mut set_value_descriptor_cache = self.descriptor_cache.lock(); set_value_descriptor_cache.clear(); for k in set_value_descriptor_cache_keys { set_value_descriptor_cache.insert(k, ()); diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs index ecedf5ef..50ab4a53 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs @@ -1,20 +1,26 @@ mod outbound_transaction_per_node_state; mod outbound_transaction_state; +use crate::storage_manager::transact_value::OutboundTransactionHandle; + use super::*; use outbound_transaction_per_node_state::*; use outbound_transaction_state::*; use serde_with::serde_as; +pub(in crate::storage_manager) use outbound_transaction_state::OutboundTransactionRecordInfo; + impl_veilid_log_facility!("stor"); #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize)] pub(in crate::storage_manager) struct OutboundTransactionManager { + /// Record key to handle map + pub handles_by_key: HashMap, /// Each transaction per record key #[serde(skip)] - pub transactions: HashMap>, + pub transactions: HashMap, /// Last known transaction per node+record #[serde_as(as = "Vec<(_, _)>")] pub per_node_states: HashMap, @@ -29,7 +35,7 @@ impl fmt::Display for OutboundTransactionManager { for k in keys { let v = self.transactions.get(&k).unwrap(); - out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.to_string())); + out += &format!(" {}:\n{}\n", k, indent_all_by(4, v.lock().to_string())); } } out += "]\n"; @@ -58,6 +64,7 @@ impl Default for OutboundTransactionManager { impl OutboundTransactionManager { pub fn new() -> Self { Self { + handles_by_key: HashMap::new(), transactions: HashMap::new(), per_node_states: HashMap::new(), } @@ -85,4 +92,48 @@ impl OutboundTransactionManager { // }) // } } + + pub fn new_transaction( + &mut self, + record_infos: &[OutboundTransactionRecordInfo], + member: KeyPair, + safety_selection: SafetySelection, + ) -> VeilidAPIResult { + // Ensure no other transactions are using any of these record keys and make handle + let mut opaque_record_keys = vec![]; + for ri in record_infos { + let opaque_record_key = ri.record_key.opaque(); + if self.handles_by_key.contains_key(&opaque_record_key) { + apibail_generic!(format!( + "Record {} already has a a transaction open", + opaque_record_key + )); + } + opaque_record_keys.push(opaque_record_key); + } + let transaction_handle = OutboundTransactionHandle { + keys: Arc::new(opaque_record_keys.clone()), + }; + + // Create a new outbound transaction state + let outbound_transaction_state = + OutboundTransactionState::new(record_infos.to_vec(), member, safety_selection); + + // Add to transaction list + for opaque_record_key in opaque_record_keys { + self.handles_by_key + .insert(opaque_record_key, transaction_handle.clone()); + } + self.transactions + .insert(transaction_handle, outbound_transaction_state); + + // Success, return the transaction handle + Ok(transaction_handle) + } + + pub fn get_begin_params( + &self, + transaction_handle: OutboundTransactionHandle, + ) -> OutboundTransactionBeginParams { + } } diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs index fbafec0b..7b436a0b 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs @@ -1,9 +1,17 @@ use super::*; +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] +pub(in crate::storage_manager) struct OutboundTransactionRecordInfo { + pub record_key: RecordKey, + pub writer: KeyPair, +} + //#[serde_as] #[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)] pub(in crate::storage_manager) struct OutboundTransactionState { - // + record_infos: Vec, + member: KeyPair, + safety_selection: SafetySelection, } impl fmt::Display for OutboundTransactionState { @@ -47,3 +55,17 @@ impl fmt::Display for OutboundTransactionState { // ) } } + +impl OutboundTransactionState { + pub fn new( + record_infos: Vec, + member: KeyPair, + safety_selection: SafetySelection, + ) -> Self { + Self { + record_infos, + member, + safety_selection, + } + } +} diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 2566cc37..bfd99427 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -380,7 +380,7 @@ impl StorageManager { // Make the return channel let (out_tx, out_rx) = flume::unbounded::>(); - // Make do-set-value answer context + // Make operation context let schema = descriptor.schema()?; let context = Arc::new(Mutex::new(OutboundSetValueContext { value, @@ -388,7 +388,7 @@ impl StorageManager { schema, send_partial_update: true, })); - let set_value_descriptor_cache = self.set_value_descriptor_cache.clone(); + let descriptor_cache = self.descriptor_cache.clone(); // Routine to call to generate fanout let call_routine = { @@ -396,7 +396,7 @@ impl StorageManager { let registry = self.registry(); let opaque_record_key = opaque_record_key.clone(); let safety_selection = safety_selection.clone(); - let set_value_descriptor_cache = set_value_descriptor_cache.clone(); + let descriptor_cache = descriptor_cache.clone(); Arc::new( move |next_node: NodeRef| -> PinBoxFutureStatic { @@ -405,14 +405,14 @@ impl StorageManager { let descriptor = descriptor.clone(); let opaque_record_key = opaque_record_key.clone(); let safety_selection = safety_selection.clone(); - let set_value_descriptor_cache = set_value_descriptor_cache.clone(); + let descriptor_cache = descriptor_cache.clone(); Box::pin(async move { let rpc_processor = registry.rpc_processor(); // check the cache to see if we should send the descriptor let node_id = next_node.node_ids().get(opaque_record_key.kind()).unwrap(); - let svdc_key = SetValueDescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id }; - let mut send_descriptor = set_value_descriptor_cache.lock().get(&svdc_key).is_none(); + let dc_key = DescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id }; + let mut send_descriptor = descriptor_cache.lock().get(&dc_key).is_none(); // get most recent value to send let value = { @@ -478,7 +478,7 @@ impl StorageManager { // Cache if we sent the descriptor if send_descriptor { - set_value_descriptor_cache.lock().insert(svdc_key,()); + descriptor_cache.lock().insert(dc_key,()); } // See if we got a newer value back diff --git a/veilid-core/src/storage_manager/transact_value.rs b/veilid-core/src/storage_manager/transact_value.rs index 849c6fea..6827e4a4 100644 --- a/veilid-core/src/storage_manager/transact_value.rs +++ b/veilid-core/src/storage_manager/transact_value.rs @@ -2,55 +2,40 @@ use super::*; impl_veilid_log_facility!("stor"); -// /// The fully parsed descriptor -// struct DescriptorInfo { -// /// The descriptor itself -// descriptor: Arc, - -// /// The in-schema subkeys that overlap the inspected range -// subkeys: ValueSubkeyRangeSet, -// } - -// impl DescriptorInfo { -// pub fn new( -// descriptor: Arc, -// subkeys: &ValueSubkeyRangeSet, -// ) -> VeilidAPIResult { -// let schema = descriptor.schema().map_err(RPCError::invalid_format)?; -// let subkeys = schema.truncate_subkeys(subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN)); -// Ok(Self { -// descriptor, -// subkeys, -// }) -// } -// // } - -// /// Info tracked per subkey -// struct SubkeySeqCount { -// /// The newest sequence number found for a subkey -// pub seq: Option, -// /// The set of nodes that had the most recent value for this subkey -// pub consensus_nodes: Vec, -// /// The set of nodes that had any value for this subkey -// pub value_nodes: Vec, -// } - -/// The context of the outbound_transact_value operation -struct OutboundTransactValueContext { - // /// The combined sequence numbers and result counts so far - // pub seqcounts: Vec, - // /// The descriptor if we got a fresh one or empty if no descriptor was needed - // pub opt_descriptor_info: Option, +/// Info tracked per subkey +struct SubkeySeqCount { + /// The newest sequence number found for a subkey + pub seq: Option, + /// The set of nodes that had the most recent value for this subkey + pub consensus_nodes: Vec, } -#[derive(Clone, Debug, Hash, PartialEq, Eq)] +/// Transaction ids and nodes +struct NodeTransactions { + pub node_ref: NodeRef, + pub xid: u64, +} + +/// The context of the outbound_begin_transact_value operation +struct OutboundBeginTransactValueContext { + /// The combined sequence numbers and result counts so far + pub seqcounts: Vec, + /// The descriptor for this record + pub descriptor: Arc, + /// The number of non-accept since the last accept we have received + pub missed_since_last_accept: usize, + /// The set of nodes that returned a transaction id + pub xid_nodes: Vec, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct OutboundTransactionHandle { - keys: Arc> + pub keys: Arc>, } /// The result of the outbound_transact_value operation #[derive(Debug, Clone)] -pub(super) struct OutboundTransactValueBeginResult { +pub(super) struct OutboundBeginTransactValueResult { /// Fanout results for each subkey pub fanout_result: FanoutResult, /// The transactions that were retrieved @@ -67,8 +52,6 @@ pub(crate) enum InboundTransactValueResult { } impl StorageManager { - - /// Create a new transaction over a set of records /// If an existing transaction exists over these records /// or a transaction can not be performed at this time, this will fail. @@ -78,20 +61,80 @@ impl StorageManager { pub async fn transact_records( &self, record_keys: Vec, + safety_selection: SafetySelection, options: Option, ) -> VeilidAPIResult { let Ok(_guard) = self.startup_lock.enter() else { apibail_not_initialized!(); }; - // Obtain the outbound transaction manager - let otm = self.inner.lock().await; - otm. + // Early rejection if dht is not online + if !self.dht_is_online() { + apibail_try_again!("dht is not online"); + } - xxx write out document about offline-first dht, staged writes, and how to deal with queued transactions + // Resolve options + let options = options.unwrap_or_default(); - self.watch_values_inner(watch_lock, subkeys, expiration, count) - .await + // Get opened records and construct record infos + let (transaction_handle, begin_params_list) = { + let mut inner = self.inner.lock().await; + let mut member = options.member.clone(); + let mut record_infos = vec![]; + for record_key in record_keys { + let opaque_record_key = record_key.opaque(); + let Some(opened_record) = inner.opened_records.get(&opaque_record_key) else { + apibail_generic!(format!("record key not open: {}", opaque_record_key)); + }; + if record_key.encryption_key().map(|x| x.ref_value()) + != opened_record.encryption_key() + { + apibail_generic!(format!( + "record encryption key does not match opened record encryption key: {}", + opaque_record_key + )); + } + // Add to record infos + let Some(writer) = opened_record.writer().cloned() else { + apibail_generic!(format!( + "record not opened for writing: {}", + opaque_record_key + )); + }; + record_infos.push(OutboundTransactionRecordInfo { record_key, writer }); + + // Choose first opened record writer as 'member' if we don't have one yet + if member.is_none() { + if let Some(writer) = opened_record.writer() { + member = Some(writer.clone()); + } + } + } + + // If we have no transaction member to verify the transaction, fail out + let Some(member) = member else { + apibail_generic!( + "no record keys opened for writing and no transaction member specified" + ); + }; + + // Obtain the outbound transaction manager + let otm = &mut inner.outbound_transaction_manager; + + // Create a new transaction if possible + let transaction_handle = + otm.new_transaction(&record_infos, member, safety_selection)?; + + // Get parameters for beginning a transaction + let begin_params_list = otm.get_begin_params(transaction_handle.clone()); + + (transaction_handle, begin_params_list) + }; + + // Begin transactions on all records + for begin_params in begin_params_list { + // + } } //////////////////////////////////////////////////////////////////////// @@ -102,11 +145,10 @@ impl StorageManager { pub(super) async fn outbound_begin_transact_value( &self, opaque_record_key: OpaqueRecordKey, - descriptor: Option VeilidAPIResult { + writer: KeyPair, + ) -> VeilidAPIResult { let routing_domain = RoutingDomain::PublicInternet; - let requested_subkeys = subkeys.clone(); // Get the DHT parameters for 'TransactValue' let config = self.config(); @@ -131,26 +173,32 @@ impl StorageManager { .collect() }; - // Make do-inspect-value answer context - let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() { - // Get the descriptor info. This also truncates the subkeys list to what can be returned from the network. - Some(DescriptorInfo::new(descriptor, &subkeys)?) - } else { - None + // Pull the descriptor for this record + let descriptor = { + let mut inner = self.inner.lock().await; + let local_inspect_result = self + .handle_inspect_local_value_inner( + &mut inner, + opaque_record_key.clone(), + ValueSubkeyRangeSet::full(), + true, + ) + .await?; + local_inspect_result.opt_descriptor().unwrap() }; - let context = Arc::new(Mutex::new(OutboundInspectValueContext { - seqcounts: local_inspect_result - .seqs() - .iter() - .map(|s| SubkeySeqCount { - seq: *s, - consensus_nodes: vec![], - value_nodes: vec![], - }) - .collect(), - opt_descriptor_info, + let schema = descriptor.schema()?; + let subkey_count = + usize::try_from(schema.max_subkey() + 1).map_err(VeilidAPIError::internal)?; + + // Make operation context + let context = Arc::new(Mutex::new(OutboundBeginTransactValueContext { + descriptor, + missed_since_last_accept: 0, + seqcounts: vec![], + xid_nodes: vec![], })); + let descriptor_cache = self.descriptor_cache.clone(); // Routine to call to generate fanout let call_routine = { @@ -158,87 +206,103 @@ impl StorageManager { let registry = self.registry(); let opaque_record_key = opaque_record_key.clone(); let safety_selection = safety_selection.clone(); + let descriptor_cache = descriptor_cache.clone(); Arc::new( move |next_node: NodeRef| -> PinBoxFutureStatic { let context = context.clone(); let registry = registry.clone(); - let opt_descriptor = local_inspect_result.opt_descriptor(); - let subkeys = subkeys.clone(); let opaque_record_key = opaque_record_key.clone(); let safety_selection = safety_selection.clone(); + let descriptor_cache = descriptor_cache.clone(); Box::pin(async move { let rpc_processor = registry.rpc_processor(); - let iva = match + // check the cache to see if we should send the descriptor + let node_id = next_node.node_ids().get(opaque_record_key.kind()).unwrap(); + let dc_key = DescriptorCacheKey{ opaque_record_key: opaque_record_key.clone(), node_id }; + let mut send_descriptor = descriptor_cache.lock().get(&dc_key).is_none(); + + // send across the wire, with a retry if the remote needed the descriptor + let tva = loop { + // send across the wire + let tva = match rpc_processor - .rpc_call_inspect_value( + .rpc_call_transact_value( Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection), opaque_record_key.clone(), - subkeys.clone(), - opt_descriptor.map(|x| (*x).clone()), + None, + TransactValueCommand::Begin, + descriptor.as_ref().clone(), + send_descriptor, + writer, ) .await? { - NetworkResult::Timeout => { - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout}); + NetworkResult::Timeout => { + return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Timeout}); + } + NetworkResult::ServiceUnavailable(_) | + NetworkResult::NoConnection(_) | + NetworkResult::AlreadyExists(_) | + NetworkResult::InvalidMessage(_) => { + return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); + } + NetworkResult::Value(v) => v + }; + + // Do a retry if we needed to send the descriptor + // (if the cache was wrong) + if tva.answer.accepted { + if tva.answer.needs_descriptor { + if !send_descriptor { + send_descriptor = true; + continue; + } else { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key); + } + } + } else if tva.answer.needs_descriptor { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key); } - NetworkResult::ServiceUnavailable(_) | - NetworkResult::NoConnection(_) | - NetworkResult::AlreadyExists(_) | - NetworkResult::InvalidMessage(_) => { - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); - } - NetworkResult::Value(v) => v + + break tva; }; - let answer = iva.answer; + let answer = tva.answer; - // Keep the descriptor if we got one. If we had a last_descriptor it will - // already be validated by rpc_call_inspect_value - if let Some(descriptor) = answer.descriptor { - let mut ctx = context.lock(); - if ctx.opt_descriptor_info.is_none() { - // Get the descriptor info. This also truncates the subkeys list to what can be returned from the network. - let descriptor_info = - match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) { - Ok(v) => v, - Err(e) => { - veilid_log!(registry debug target:"network_result", "InspectValue returned an invalid descriptor: {}", e); - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); - } - }; - ctx.opt_descriptor_info = Some(descriptor_info); - } - } - - // Keep the value if we got one and it is newer and it passes schema validation - if answer.seqs.is_empty() { - veilid_log!(registry debug target:"network_result", "InspectValue returned no seq, fanout call returned peers {}", answer.peers.len()); - return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Rejected}); - } - - veilid_log!(registry debug target:"network_result", "Got seqs back: len={}", answer.seqs.len()); + // If the node was close enough to accept the value let mut ctx = context.lock(); + if !tva.answer.accepted { + ctx.missed_since_last_accept += 1; - // Ensure we have a schema and descriptor etc - let Some(descriptor_info) = &ctx.opt_descriptor_info else { - // Got a value but no descriptor for it - // Move to the next node - veilid_log!(registry debug target:"network_result", "InspectValue returned a value with no descriptor invalid descriptor"); - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); + // Return peers if we have some + veilid_log!(registry debug target:"network_result", "BeginTransactValue missed: {}, fanout call returned peers {}", ctx.missed_since_last_accept, tva.answer.peers.len()); + return Ok(FanoutCallOutput{peer_info_list:tva.answer.peers, disposition: FanoutCallDisposition::Rejected}); + } + + // Cache if we sent the descriptor + if send_descriptor { + descriptor_cache.lock().insert(dc_key,()); + } + + // Get the transaction id + let Some(xid) = answer.transaction_id else { + veilid_log!(registry debug target:"network_result", "BeginTransactValue returned no transaction id, fanout call returned peers {}", answer.peers.len()); + return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Rejected}); }; - // Get number of subkeys from schema and ensure we are getting the - // right number of sequence numbers betwen that and what we asked for - #[allow(clippy::unnecessary_cast)] - if answer.seqs.len() as u64 != descriptor_info.subkeys.len() as u64 { - // Not the right number of sequence numbers - // Move to the next node + // Get the sequence number state at the point of the transaction + if answer.seqs.len() != subkey_count { veilid_log!(registry debug target:"network_result", "wrong number of seqs returned {} (wanted {})", answer.seqs.len(), - descriptor_info.subkeys.len()); - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); + subkey_count); + return Ok(FanoutCallOutput{peer_info_list: answer.peers, disposition: FanoutCallDisposition::Invalid}); } + veilid_log!(registry debug target:"network_result", "Got transaction id and seqs back: xid={}, len={}", xid, answer.seqs.len()); + + // Add transaction id node to list + ctx.xid_nodes.push(NodeTransactions { node_ref: next_node.clone(), xid }); + // If we have a prior seqs list, merge in the new seqs if ctx.seqcounts.is_empty() { ctx.seqcounts = answer @@ -248,17 +312,9 @@ impl StorageManager { seq: *s, // One node has shown us the newest sequence numbers so far consensus_nodes: vec![next_node.clone()], - value_nodes: vec![next_node.clone()], }) .collect(); } else { - if ctx.seqcounts.len() != answer.seqs.len() { - veilid_log!(registry debug target:"network_result", "seqs list length should always be equal by now: {} (wanted {})", - answer.seqs.len(), - ctx.seqcounts.len()); - return Ok(FanoutCallOutput{peer_info_list: vec![], disposition: FanoutCallDisposition::Invalid}); - - } for pair in ctx.seqcounts.iter_mut().zip(answer.seqs.iter()) { let ctx_seqcnt = pair.0; let answer_seq = *pair.1; @@ -285,23 +341,21 @@ impl StorageManager { ctx_seqcnt.consensus_nodes.push(next_node.clone()); } } - ctx_seqcnt.value_nodes.push(next_node.clone()); } } - // Return peers if we have some - veilid_log!(registry debug target:"network_result", "InspectValue fanout call returned peers {}", answer.peers.len()); + veilid_log!(registry debug target:"network_result", "BeginTransactValue fanout call returned peers {}", answer.peers.len()); - // Inspect doesn't actually use the fanout queue consensus tracker + // Transact doesn't actually use the fanout queue consensus tracker Ok(FanoutCallOutput { peer_info_list: answer.peers, disposition: FanoutCallDisposition::Accepted}) - }.instrument(tracing::trace_span!("outbound_inspect_value fanout call"))) as PinBoxFuture + }.instrument(tracing::trace_span!("outbound_begin_transact_value fanout call"))) as PinBoxFuture }, ) }; // Routine to call to check if we're done at each step - // For inspect, we are tracking consensus externally from the FanoutCall, + // For transact, we are tracking consensus externally from the FanoutCall, // for each subkey, rather than a single consensus, so the single fanoutresult // that is passed in here is ignored in favor of our own per-subkey tracking let check_done = { @@ -317,7 +371,7 @@ impl StorageManager { } } - !ctx.seqcounts.is_empty() && ctx.opt_descriptor_info.is_some() && has_consensus + !ctx.seqcounts.is_empty() && has_consensus }) }; @@ -354,14 +408,14 @@ impl StorageManager { } if subkey_fanout_results.len() == 1 { - veilid_log!(self debug "InspectValue Fanout: {:#}\n{:#}", fanout_result, subkey_fanout_results.first().unwrap()); + veilid_log!(self debug "BeginTransactValue Fanout: {:#}\n{:#}", fanout_result, subkey_fanout_results.first().unwrap()); } else { - veilid_log!(self debug "InspectValue Fanout: {:#}:\n{}", fanout_result, debug_fanout_results(&subkey_fanout_results)); + veilid_log!(self debug "BeginTransactValue Fanout: {:#}:\n{}", fanout_result, debug_fanout_results(&subkey_fanout_results)); } - let result = OutboundInspectValueResult { + let result = OutboundBeginTransactValueResult { subkey_fanout_results, - inspect_result: InspectResult::new( + transact_result: BeginTransactResult::new( self, requested_subkeys, "outbound_inspect_value", @@ -389,14 +443,12 @@ impl StorageManager { Ok(result) } - /// Perform end transaction queries on the network for a single record #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_end_transact_value( &self, opaque_record_key: OpaqueRecordKey, ) -> VeilidAPIResult { - } /// Perform commit transaction queries on the network for a single record @@ -405,7 +457,6 @@ impl StorageManager { &self, opaque_record_key: OpaqueRecordKey, ) -> VeilidAPIResult { - } /// Perform rollback transaction queries on the network for a single record @@ -414,7 +465,6 @@ impl StorageManager { &self, opaque_record_key: OpaqueRecordKey, ) -> VeilidAPIResult { - } /// Handle a received 'TransactValue' query @@ -429,7 +479,8 @@ impl StorageManager { ) -> VeilidAPIResult> { let mut inner = self.inner.lock().await; - - Ok(NetworkResult::value(InboundTransactValueResult::Success(transact_result))) + Ok(NetworkResult::value(InboundTransactValueResult::Success( + transact_result, + ))) } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 475314b3..fc0d2f0e 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -403,7 +403,7 @@ impl StorageManager { .collect() }; - // Make do-watch-value answer context + // Make operation context let context = Arc::new(Mutex::new(OutboundWatchValueContext::default())); // Routine to call to generate fanout diff --git a/veilid-core/src/veilid_api/dht_transaction.rs b/veilid-core/src/veilid_api/dht_transaction.rs index 5a97367f..4bad3d6f 100644 --- a/veilid-core/src/veilid_api/dht_transaction.rs +++ b/veilid-core/src/veilid_api/dht_transaction.rs @@ -1,4 +1,5 @@ use super::*; +use crate::storage_manager::OutboundTransactionHandle; impl_veilid_log_facility!("veilid_api"); @@ -11,6 +12,8 @@ pub(crate) struct DHTTransactionUnlockedInner {} /// DHT operations performed out of a transaction may be processed in any order, and only operate on one subkey at a time /// for a given record. Transactions allow you to bind a set of operations so they all succeed, or fail together, and at the same time. /// +/// Transactional DHT operations can only be performed when the node is online, and will error with [VeilidAPIError::TryAgain] if offline. +/// /// Transactions must be committed when all of their operations are registered, or rolled back if the group of operations is to be cancelled. #[derive(Clone)] #[must_use] @@ -18,7 +21,7 @@ pub struct DHTTransaction { /// Routing context in use routing_context: RoutingContext, /// Immutable State - unlocked_inner: Arc, + unlocked_inner: Arc, } impl fmt::Debug for DHTTransaction { @@ -37,7 +40,7 @@ impl DHTTransaction { routing_context: RoutingContext, handle: OutboundTransactionHandle, ) -> VeilidAPIResult { - let config = routing_context.api().api.config()?; + let config = routing_context.api().config()?; Ok(Self { routing_context, diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 30101cc6..fb87fd3e 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -674,7 +674,12 @@ impl RoutingContext { } let storage_manager = self.api.core_context()?.storage_manager(); - let handle = Box::pin(storage_manager.transact_records(record_keys, options)).await; + let handle = Box::pin(storage_manager.transact_records( + record_keys, + self.unlocked_inner.safety_selection.clone(), + options, + )) + .await?; match DHTTransaction::try_new(self.clone(), handle) { Ok(v) => Ok(v), diff --git a/veilid-core/src/veilid_api/types/dht/transact_dht_records_options.rs b/veilid-core/src/veilid_api/types/dht/transact_dht_records_options.rs index 20e4b638..ebd82b7b 100644 --- a/veilid-core/src/veilid_api/types/dht/transact_dht_records_options.rs +++ b/veilid-core/src/veilid_api/types/dht/transact_dht_records_options.rs @@ -1,20 +1,19 @@ use super::*; +/// Options controlling the #[derive(Debug, JsonSchema, Serialize, Deserialize, Clone)] pub struct TransactDHTRecordsOptions { + /// The schema member public key to use when opening the transcation. + /// Setting this does not override any writer keys used by transaction operations + /// and is only used to determine access to the transaction by validating the member + /// is in the schema. #[schemars(with = "Option")] - pub writer: Option, - /// Defaults to true. If false, the transaction will not be created or committed if the node is offline, - /// and a TryAgain error will be returned. - pub allow_offline: Option, + pub member: Option, } impl Default for TransactDHTRecordsOptions { fn default() -> Self { - Self { - writer: None, - allow_offline: Some(AllowOffline(true)), - } + Self { member: None } } } @@ -27,10 +26,7 @@ pub mod ts { pub struct TransactDHTRecordsOptions { #[tsify(type = "KeyPair", optional)] #[serde(with = "serde_wasm_bindgen::preserve")] - pub writer: JsValue, - /// Defaults to true. If false, the transaction will not be created or committed if the node is offline, - /// and a TryAgain error will be returned. - pub allow_offline: Option, + pub member: JsValue, } } @@ -39,12 +35,8 @@ impl TryFrom for TransactDHTRecordsOptions { type Error = VeilidAPIError; fn try_from(value: ts::TransactDHTRecordsOptions) -> Result { - let writer = wasm_bindgen_derive::try_from_js_option::(value.writer) + let member = wasm_bindgen_derive::try_from_js_option::(value.member) .map_err(VeilidAPIError::generic)?; - let allow_offline = value.allow_offline.clone(); - Ok(TransactDHTRecordsOptions { - writer, - allow_offline, - }) + Ok(TransactDHTRecordsOptions { member }) } }