From 4e92fd0911c107a7ca24c5a9a331bc5ba4d39449 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 9 Oct 2025 13:34:47 -0400 Subject: [PATCH] checkpoint --- .../outbound_transaction_manager/mod.rs | 224 ++++++++- .../node_transaction_id.rs | 58 +++ .../outbound_transaction_state.rs | 98 ++-- .../src/storage_manager/transact_value.rs | 438 ++++++++++++++++-- veilid-wasm/README.md | 4 +- 5 files changed, 743 insertions(+), 79 deletions(-) create mode 100644 veilid-core/src/storage_manager/outbound_transaction_manager/node_transaction_id.rs diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs index c2e03a09..362023ce 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/mod.rs @@ -1,9 +1,10 @@ +mod node_transaction_id; mod outbound_transaction_per_node_state; mod outbound_transaction_state; use crate::storage_manager::transact_value::{ - NodeTransactionId, OutboundBeginTransactValueResult, OutboundRollbackTransactValueResult, - OutboundTransactionHandle, + OutboundBeginTransactValueResult, OutboundCommitTransactValueResult, + OutboundEndTransactValueResult, OutboundRollbackTransactValueResult, OutboundTransactionHandle, }; use super::*; @@ -12,6 +13,7 @@ use outbound_transaction_state::*; use serde_with::serde_as; +pub(in crate::storage_manager) use node_transaction_id::*; pub(in crate::storage_manager) use outbound_transaction_state::OutboundTransactionRecordInfo; impl_veilid_log_facility!("stor"); @@ -23,6 +25,22 @@ pub(in crate::storage_manager) struct OutboundBeginTransactValueParams { pub writer: KeyPair, } +/// parameters required to end a transaction +pub(in crate::storage_manager) struct OutboundEndTransactValueParams { + pub opaque_record_key: OpaqueRecordKey, + pub safety_selection: SafetySelection, + pub writer: KeyPair, + pub node_xids: Vec, +} + +/// parameters required to commit a transaction +pub(in crate::storage_manager) struct OutboundCommitTransactValueParams { + pub opaque_record_key: OpaqueRecordKey, + pub safety_selection: SafetySelection, + pub writer: KeyPair, + pub node_xids: Vec, +} + /// parameters required to rollback a transaction pub(in crate::storage_manager) struct OutboundRollbackTransactValueParams { pub opaque_record_key: OpaqueRecordKey, @@ -212,21 +230,23 @@ impl OutboundTransactionManager { } // See if we have enough transaction nodes per record key + // If we have too many, they can hang out until the transaction is done, as they + // may be useful for sync or get_value inside the transaction later as consensus nodes for some subkeys + // (the N=5 closest nodes will always be used for sets, but other nodes that were previously closer may + // still have newer values than the closest nodes right now) for record_info in outbound_transaction_state.get_record_infos() { if record_info.node_xids.len() < outbound_transaction_state.consensus_count() { failed = true; - } else if record_info.node_xids.len() > outbound_transaction_state.consensus_count() { - // xxx reduce number of node transactions and rollback extra } } // Change stage if failed { outbound_transaction_state.set_stage(OutboundTransactionStage::Failed); - } else { - outbound_transaction_state.set_stage(OutboundTransactionStage::Begin); + apibail_try_again!("did not get consensus of transaction ids"); } + outbound_transaction_state.set_stage(OutboundTransactionStage::Begin); Ok(()) } @@ -243,10 +263,12 @@ impl OutboundTransactionManager { // Assert stage if !matches!( outbound_transaction_state.stage(), - OutboundTransactionStage::Begin | OutboundTransactionStage::End + OutboundTransactionStage::Begin + | OutboundTransactionStage::End + | OutboundTransactionStage::Failed ) { apibail_internal!(format!( - "stage was {:?}, wanted Begin or End", + "stage was {:?}, wanted Begin, End, or Failed", outbound_transaction_state.stage(), )); } @@ -288,21 +310,197 @@ impl OutboundTransactionManager { OutboundTransactionStage::PreRollback ) { apibail_internal!(format!( - "stage was {:?}, wanted {:?}", + "stage was {:?}, wanted PreRollback", outbound_transaction_state.stage(), - OutboundTransactionStage::PreBegin )); } // Remove node id transactions + let mut failed = false; for result in results { - outbound_transaction_state - .remove_node_transaction_ids(&result.opaque_record_key, result.node_xids); + if !outbound_transaction_state + .remove_node_transaction_ids(&result.opaque_record_key, result.node_xids) + { + // If not all transaction ids were rolled back, then this operation failed + failed = true; + } } // Change stage - outbound_transaction_state.set_stage(OutboundTransactionStage::Rollback); + if failed { + outbound_transaction_state.set_stage(OutboundTransactionStage::Failed); + apibail_try_again!("did not roll back all transaction ids"); + } + outbound_transaction_state.set_stage(OutboundTransactionStage::Rollback); + Ok(()) + } + + /// Prepare to end a transaction + pub fn prepare_end_transact_value_params( + &mut self, + transaction_handle: OutboundTransactionHandle, + ) -> VeilidAPIResult> { + let outbound_transaction_state = self + .transactions + .get_mut(&transaction_handle) + .ok_or_else(|| VeilidAPIError::internal("missing transaction"))?; + + // Assert stage + if !matches!( + outbound_transaction_state.stage(), + OutboundTransactionStage::Begin + ) { + apibail_internal!(format!( + "stage was {:?}, wanted Begin", + outbound_transaction_state.stage(), + )); + } + + let mut out = vec![]; + + for record_info in outbound_transaction_state.get_record_infos() { + let opaque_record_key = record_info.record_key.opaque(); + let node_xids = record_info.node_xids.clone(); + + out.push(OutboundEndTransactValueParams { + opaque_record_key, + safety_selection: outbound_transaction_state.safety_selection(), + writer: outbound_transaction_state.member(), + node_xids, + }); + } + + outbound_transaction_state.set_stage(OutboundTransactionStage::PreEnd); + + Ok(out) + } + + /// Record end transaction + pub fn record_end_transact_value_results( + &mut self, + transaction_handle: OutboundTransactionHandle, + results: Vec, + ) -> VeilidAPIResult<()> { + // Get transaction + let outbound_transaction_state = self + .transactions + .get_mut(&transaction_handle) + .ok_or_else(|| VeilidAPIError::internal("missing transaction"))?; + + // Assert stage + if !matches!( + outbound_transaction_state.stage(), + OutboundTransactionStage::PreEnd + ) { + apibail_internal!(format!( + "stage was {:?}, wanted PreEnd", + outbound_transaction_state.stage(), + )); + } + + // Check if node id transactions reached consensus + let mut failed = false; + for result in results { + if !outbound_transaction_state + .check_node_transaction_ids(&result.opaque_record_key, result.node_xids) + { + // If not all transaction were ended, then this operation failed + failed = true; + } + } + + // Change stage + if failed { + outbound_transaction_state.set_stage(OutboundTransactionStage::Failed); + apibail_try_again!("did not end all transactions"); + } + + outbound_transaction_state.set_stage(OutboundTransactionStage::End); + Ok(()) + } + + /// Prepare to commit a transaction + pub fn prepare_commit_transact_value_params( + &mut self, + transaction_handle: OutboundTransactionHandle, + ) -> VeilidAPIResult> { + let outbound_transaction_state = self + .transactions + .get_mut(&transaction_handle) + .ok_or_else(|| VeilidAPIError::internal("missing transaction"))?; + + // Assert stage + if !matches!( + outbound_transaction_state.stage(), + OutboundTransactionStage::End + ) { + apibail_internal!(format!( + "stage was {:?}, wanted End", + outbound_transaction_state.stage(), + )); + } + + let mut out = vec![]; + + for record_info in outbound_transaction_state.get_record_infos() { + let opaque_record_key = record_info.record_key.opaque(); + let node_xids = record_info.node_xids.clone(); + + out.push(OutboundCommitTransactValueParams { + opaque_record_key, + safety_selection: outbound_transaction_state.safety_selection(), + writer: outbound_transaction_state.member(), + node_xids, + }); + } + + outbound_transaction_state.set_stage(OutboundTransactionStage::PreCommit); + + Ok(out) + } + + /// Record commit transaction + pub fn record_commit_transact_value_results( + &mut self, + transaction_handle: OutboundTransactionHandle, + results: Vec, + ) -> VeilidAPIResult<()> { + // Get transaction + let outbound_transaction_state = self + .transactions + .get_mut(&transaction_handle) + .ok_or_else(|| VeilidAPIError::internal("missing transaction"))?; + + // Assert stage + if !matches!( + outbound_transaction_state.stage(), + OutboundTransactionStage::PreCommit + ) { + apibail_internal!(format!( + "stage was {:?}, wanted PreCommit", + outbound_transaction_state.stage(), + )); + } + + // Check if node id transactions reached consensus + let mut failed = false; + for result in results { + if !outbound_transaction_state + .check_node_transaction_ids(&result.opaque_record_key, result.node_xids) + { + // If not all transaction were committed, then this operation failed + failed = true; + } + } + + // Change stage + if failed { + outbound_transaction_state.set_stage(OutboundTransactionStage::Failed); + apibail_try_again!("did not commit all transactions"); + } + + outbound_transaction_state.set_stage(OutboundTransactionStage::Commit); Ok(()) } } diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/node_transaction_id.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/node_transaction_id.rs new file mode 100644 index 00000000..764fc995 --- /dev/null +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/node_transaction_id.rs @@ -0,0 +1,58 @@ +use super::*; + +/// Transaction id and node id pair +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NodeTransactionId { + node_id: NodeId, + xid: u64, + #[serde(skip)] + node_ref: Option, +} + +impl NodeTransactionId { + pub fn new(kind: CryptoKind, xid: u64, node_ref: NodeRef) -> Self { + Self { + node_id: node_ref.node_ids().get(kind).unwrap(), + xid, + node_ref: Some(node_ref), + } + } + + pub fn prepare(&mut self, routing_table: &RoutingTable) -> bool { + let Some(node_ref) = routing_table + .lookup_node_ref(self.node_id.clone()) + .ok() + .flatten() + else { + return false; + }; + self.node_ref = Some(node_ref); + true + } + + pub fn node_ref(&self) -> NodeRef { + // Safe as long as prepare has been called + self.node_ref.clone().unwrap() + } + + pub fn node_id(&self) -> NodeId { + self.node_id.clone() + } + + pub fn xid(&self) -> u64 { + self.xid + } +} + +impl PartialEq for NodeTransactionId { + fn eq(&self, other: &NodeTransactionId) -> bool { + self.node_id == other.node_id && self.xid == other.xid + } +} +impl Eq for NodeTransactionId {} + +impl fmt::Display for NodeTransactionId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:xid={}", self.node_id, self.xid) + } +} diff --git a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs index f92c69d3..1a216c5f 100644 --- a/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs +++ b/veilid-core/src/storage_manager/outbound_transaction_manager/outbound_transaction_state.rs @@ -6,9 +6,6 @@ pub(in crate::storage_manager) struct OutboundTransactionRecordInfo { pub record_key: RecordKey, pub writer: KeyPair, pub node_xids: Vec, - /// Node refs to keep entries around while we're using them - #[serde(skip)] - pub node_refs: Vec, } impl fmt::Display for OutboundTransactionRecordInfo { @@ -33,23 +30,11 @@ impl OutboundTransactionRecordInfo { record_key, writer, node_xids: vec![], - node_refs: vec![], } } pub fn prepare(&mut self, routing_table: &RoutingTable) { - self.node_xids.retain(|x| { - if let Some(node_ref) = routing_table - .lookup_node_ref(x.node_id.clone()) - .ok() - .flatten() - { - self.node_refs.push(node_ref); - true - } else { - false - } - }) + self.node_xids.retain_mut(|x| x.prepare(routing_table)) } } @@ -81,6 +66,10 @@ pub(in crate::storage_manager) enum OutboundTransactionStage { /// State of a single transaction across multiple records #[derive(Clone, Debug, Serialize, Deserialize)] pub(in crate::storage_manager) struct OutboundTransactionState { + /// The timestamp of when the transaction was created + created_ts: Timestamp, + /// The timestamp of the last stage transition + stage_ts: Timestamp, /// The operational stage of this transaction stage: OutboundTransactionStage, /// How many nodes are required for a consensus for this transaction @@ -97,17 +86,21 @@ impl fmt::Display for OutboundTransactionState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - r#"record_infos: + r#"created_ts: {} stage_ts: {} stage: {:?} +member: {} safety_selection: {:?} +record_infos: {} -member: {} -safety_selection: {:?}"#, +"#, + self.created_ts, + self.stage_ts, + self.stage, + self.member, + self.safety_selection, self.record_infos .iter() .map(|x| format!(" {}", x)) .collect::>() .join("\n"), - self.member, - self.safety_selection, ) } } @@ -119,7 +112,10 @@ impl OutboundTransactionState { member: KeyPair, safety_selection: SafetySelection, ) -> Self { + let cur_ts = Timestamp::now(); Self { + created_ts: cur_ts, + stage_ts: cur_ts, stage: OutboundTransactionStage::Init, consensus_count, record_infos, @@ -134,11 +130,20 @@ impl OutboundTransactionState { } } + pub fn created_ts(&self) -> Timestamp { + self.created_ts + } + + pub fn stage_ts(&self) -> Timestamp { + self.stage_ts + } + pub fn stage(&self) -> OutboundTransactionStage { self.stage } pub fn set_stage(&mut self, stage: OutboundTransactionStage) { + self.stage_ts = Timestamp::now(); self.stage = stage } @@ -158,16 +163,28 @@ impl OutboundTransactionState { self.safety_selection.clone() } + fn sort_node_xids(opaque_record_key: &OpaqueRecordKey, node_xids: &mut Vec) { + node_xids.sort_by(|a, b| { + let dist_a = opaque_record_key + .to_hash_coordinate() + .distance(&a.node_id().to_hash_coordinate()); + let dist_b = opaque_record_key + .to_hash_coordinate() + .distance(&b.node_id().to_hash_coordinate()); + + dist_a.cmp(&dist_b) + }); + } + pub fn set_node_transaction_ids( &mut self, opaque_record_key: &OpaqueRecordKey, - node_xids: Vec, + mut node_xids: Vec, ) { + Self::sort_node_xids(opaque_record_key, &mut node_xids); + for ri in &mut self.record_infos { if &ri.record_key.opaque() == opaque_record_key { - - xxx sort by closeness - ri.node_xids = node_xids; return; } @@ -175,15 +192,40 @@ impl OutboundTransactionState { unreachable!("attempting to modify missing record in transaction"); } + pub fn check_node_transaction_ids( + &mut self, + opaque_record_key: &OpaqueRecordKey, + mut node_xids: Vec, + ) -> bool { + Self::sort_node_xids(opaque_record_key, &mut node_xids); + + for ri in &mut self.record_infos { + if &ri.record_key.opaque() == opaque_record_key { + let mut count = 0; + + for node_xid in &ri.node_xids { + if node_xids.contains(node_xid) { + count += 1; + } + } + + return count >= self.consensus_count; + } + } + unreachable!("attempting to modify missing record in transaction"); + } + pub fn remove_node_transaction_ids( &mut self, opaque_record_key: &OpaqueRecordKey, - node_xids: Vec, - ) { + mut node_xids: Vec, + ) -> bool { + Self::sort_node_xids(opaque_record_key, &mut node_xids); + for ri in &mut self.record_infos { if &ri.record_key.opaque() == opaque_record_key { ri.node_xids.retain(|x| !node_xids.contains(x)); - return; + return ri.node_xids.is_empty(); } } unreachable!("attempting to modify missing record in transaction"); diff --git a/veilid-core/src/storage_manager/transact_value.rs b/veilid-core/src/storage_manager/transact_value.rs index fc4d5d7c..a11b8445 100644 --- a/veilid-core/src/storage_manager/transact_value.rs +++ b/veilid-core/src/storage_manager/transact_value.rs @@ -14,19 +14,6 @@ struct SubkeySeqCount { pub value_nodes: Vec, } -/// Transaction id and node id pair -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] -pub(super) struct NodeTransactionId { - pub node_id: NodeId, - pub xid: u64, -} - -impl fmt::Display for NodeTransactionId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:xid={}", self.node_id, self.xid) - } -} - /// The context of the outbound_begin_transact_value operation struct OutboundBeginTransactValueContext { /// The combined sequence numbers and result counts so far @@ -61,6 +48,24 @@ pub(super) struct OutboundBeginTransactValueResult { pub node_xids: Vec, } +/// The result of the outbound_end_transact_value operation +#[derive(Debug, Clone)] +pub(super) struct OutboundEndTransactValueResult { + /// The record key being transacted + pub opaque_record_key: OpaqueRecordKey, + /// The set of nodes that confirmed transaction end + pub node_xids: Vec, +} + +/// The result of the outbound_commit_transact_value operation +#[derive(Debug, Clone)] +pub(super) struct OutboundCommitTransactValueResult { + /// The record key being transacted + pub opaque_record_key: OpaqueRecordKey, + /// The set of nodes that confirmed transaction commit + pub node_xids: Vec, +} + /// The result of the outbound_rollback_transact_value operation #[derive(Debug, Clone)] pub(super) struct OutboundRollbackTransactValueResult { @@ -86,7 +91,7 @@ impl StorageManager { /// Returns a transaction handle if the transaction was created /// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be created #[instrument(level = "trace", target = "stor", skip_all)] - pub async fn transact_records( + pub async fn begin_transaction( &self, record_keys: Vec, safety_selection: SafetySelection, @@ -181,7 +186,6 @@ impl StorageManager { results.push(v); } Err(e) => { - // Delete transaction if opt_begin_error.is_none() { opt_begin_error = Some(e); } @@ -207,10 +211,7 @@ impl StorageManager { // Rollback if any errors happened if let Some(begin_error) = opt_begin_error { veilid_log!(self debug "Begin transaction failed, rolling back outbound transaction: {}", begin_error); - if let Err(e) = self - .rollback_outbound_transaction(transaction_handle.clone()) - .await - { + if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await { veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e); } @@ -223,14 +224,202 @@ impl StorageManager { Ok(transaction_handle) } - //////////////////////////////////////////////////////////////////////// - - /// Roll back a transaction - #[instrument(level = "trace", target = "dht", skip_all, err)] - async fn rollback_outbound_transaction( + /// End a transaction over a set of records + /// If an existing transaction does not exist over these records + /// or a transaction can not be performed at this time, this will fail. + /// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be ended at this time + /// Returns Err(_) if the transaction end failed and resulted in rollback or drop + #[instrument(level = "trace", target = "stor", skip_all)] + pub async fn end_transaction( &self, transaction_handle: OutboundTransactionHandle, ) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + + // Early rejection if dht is not online + if !self.dht_is_online() { + apibail_try_again!("dht is not online"); + } + + let end_params_list = { + let mut inner = self.inner.lock().await; + + // Obtain the outbound transaction manager + let otm = &mut inner.outbound_transaction_manager; + + // Prepare for rollback + let commit_params_list = otm + .prepare_end_transact_value_params(transaction_handle.clone()) + .unwrap(); + + commit_params_list + }; + + // End transactions on all records + let mut unord = FuturesUnordered::new(); + for end_params in end_params_list { + let fut = self.clone().outbound_end_transact_value( + end_params.opaque_record_key, + end_params.safety_selection, + end_params.writer, + end_params.node_xids, + ); + unord.push(fut); + } + let mut results = vec![]; + let mut opt_end_error = None; + while let Some(res) = unord.next().await { + match res { + Ok(v) => { + // + results.push(v); + } + Err(e) => { + if opt_end_error.is_none() { + opt_end_error = Some(e); + } + } + } + } + + // Store end results + { + let mut inner = self.inner.lock().await; + let otm = &mut inner.outbound_transaction_manager; + if let Err(e) = + otm.record_end_transact_value_results(transaction_handle.clone(), results) + { + if opt_end_error.is_none() { + opt_end_error = Some(e); + } + } + } + + // Rollback if any errors happened + if let Some(end_error) = opt_end_error { + veilid_log!(self debug "End transaction failed, rolling back outbound transaction: {}", end_error); + if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await { + veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e); + } + + self.drop_outbound_transaction(transaction_handle).await; + + return Err(end_error); + } + + Ok(()) + } + + /// Commit a transaction over a set of records + /// If an existing transaction does not exist over these records + /// or a transaction can not be performed at this time, this will fail. + /// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be committed at this time + /// Returns Err(_) if the transaction commit failed and resulted in rollback or drop + #[instrument(level = "trace", target = "stor", skip_all)] + pub async fn commit_transaction( + &self, + transaction_handle: OutboundTransactionHandle, + ) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + + // Early rejection if dht is not online + if !self.dht_is_online() { + apibail_try_again!("dht is not online"); + } + + let commit_params_list = { + let mut inner = self.inner.lock().await; + + // Obtain the outbound transaction manager + let otm = &mut inner.outbound_transaction_manager; + + // Prepare for rollback + let commit_params_list = otm + .prepare_commit_transact_value_params(transaction_handle.clone()) + .unwrap(); + + commit_params_list + }; + + // Commit transactions on all records + let mut unord = FuturesUnordered::new(); + for commit_params in commit_params_list { + let fut = self.clone().outbound_commit_transact_value( + commit_params.opaque_record_key, + commit_params.safety_selection, + commit_params.writer, + commit_params.node_xids, + ); + unord.push(fut); + } + let mut results = vec![]; + let mut opt_commit_error = None; + while let Some(res) = unord.next().await { + match res { + Ok(v) => { + // + results.push(v); + } + Err(e) => { + if opt_commit_error.is_none() { + opt_commit_error = Some(e); + } + } + } + } + + // Store commit results + { + let mut inner = self.inner.lock().await; + let otm = &mut inner.outbound_transaction_manager; + if let Err(e) = + otm.record_commit_transact_value_results(transaction_handle.clone(), results) + { + if opt_commit_error.is_none() { + opt_commit_error = Some(e); + } + } + } + + // XXX: handle commit errors better + + // // Rollback if any errors happened + // if let Some(commit_error) = opt_commit_error { + // veilid_log!(self debug "Commit transaction failed, rolling back outbound transaction: {}", commit_error); + // if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await { + // veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e); + // } + + // self.drop_outbound_transaction(transaction_handle).await; + + // return Err(commit_error); + // } + + Ok(()) + } + + /// Roll back a transaction + /// If an error is returned, the transaction is left in a failed state and can either + /// * be dropped/ignored and the remote transaction will time out + /// * another rollback attempt can be made, which may result in a more polite termination of the remote transaction + #[instrument(level = "trace", target = "dht", skip_all, err)] + async fn rollback_transaction( + &self, + transaction_handle: OutboundTransactionHandle, + ) -> VeilidAPIResult<()> { + let Ok(_guard) = self.startup_lock.enter() else { + apibail_not_initialized!(); + }; + + // Early rejection if dht is not online + if !self.dht_is_online() { + apibail_try_again!("dht is not online"); + } + let rollback_params_list = { let mut inner = self.inner.lock().await; @@ -257,7 +446,7 @@ impl StorageManager { unord.push(fut); } let mut results = vec![]; - let mut rollback_error = None; + let mut opt_rollback_error = None; while let Some(res) = unord.next().await { match res { Ok(v) => { @@ -265,9 +454,8 @@ impl StorageManager { results.push(v); } Err(e) => { - // Delete transaction - if rollback_error.is_none() { - rollback_error = Some(e); + if opt_rollback_error.is_none() { + opt_rollback_error = Some(e); } } } @@ -280,19 +468,21 @@ impl StorageManager { if let Err(e) = otm.record_rollback_transact_value_results(transaction_handle.clone(), results) { - if rollback_error.is_none() { - rollback_error = Some(e); + if opt_rollback_error.is_none() { + opt_rollback_error = Some(e); } } } - if let Some(rberr) = rollback_error { + if let Some(rberr) = opt_rollback_error { return Err(rberr); } Ok(()) } + //////////////////////////////////////////////////////////////////////// + /// Drop a transaction. This eliminates the transaction locally and does not /// perform any actions on the network. The remote transaction will time out on its own. #[instrument(level = "trace", target = "dht", skip_all)] @@ -471,7 +661,7 @@ impl StorageManager { veilid_log!(registry debug target:"network_result", "Got transaction id and seqs back: xid={}, len={}", xid, answer.seqs.len()); // Add transaction id node to list - ctx.node_xids.push(NodeTransactionId { node_ref: next_node.clone(), xid }); + ctx.node_xids.push(NodeTransactionId::new(opaque_record_key.kind(), xid, next_node.clone())); // If we have a prior seqs list, merge in the new seqs if ctx.seqcounts.is_empty() { @@ -599,7 +789,95 @@ impl StorageManager { pub(super) async fn outbound_end_transact_value( &self, opaque_record_key: OpaqueRecordKey, - ) -> VeilidAPIResult { + safety_selection: SafetySelection, + writer: KeyPair, + node_xids: Vec, + ) -> VeilidAPIResult { + let routing_domain = RoutingDomain::PublicInternet; + + // Pull the descriptor for this record + let descriptor = { + let mut inner = self.inner.lock().await; + let local_inspect_result = self + .handle_inspect_local_value_inner( + &mut inner, + opaque_record_key.clone(), + ValueSubkeyRangeSet::full(), + true, + ) + .await?; + local_inspect_result.opt_descriptor().unwrap() + }; + + // Send all ends in parallel + let mut unord = FuturesUnordered::new(); + for node_xid in node_xids { + let registry = self.registry(); + + let next_node = node_xid.node_ref(); + let next_xid = node_xid.xid(); + let opaque_record_key = opaque_record_key.clone(); + let safety_selection = safety_selection.clone(); + let descriptor = descriptor.clone(); + let writer = writer.clone(); + + let fut = async move { + let rpc_processor = registry.rpc_processor(); + + let tva = match rpc_processor + .rpc_call_transact_value( + Destination::direct(next_node.routing_domain_filtered(routing_domain)) + .with_safety(safety_selection.clone()), + opaque_record_key.clone(), + Some(next_xid), + TransactValueCommand::End, + descriptor.as_ref().clone(), + false, + writer.clone(), + ) + .await + .map_err(VeilidAPIError::from)? + { + NetworkResult::Timeout => { + return VeilidAPIResult::Ok(None); + } + NetworkResult::ServiceUnavailable(_) + | NetworkResult::NoConnection(_) + | NetworkResult::AlreadyExists(_) + | NetworkResult::InvalidMessage(_) => { + return Ok(None); + } + NetworkResult::Value(v) => v, + }; + if tva.answer.accepted { + if tva.answer.needs_descriptor { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key); + } + } else if tva.answer.needs_descriptor { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key); + } + + Ok(Some(node_xid)) + }; + + unord.push(fut); + } + + let mut end_node_xids = vec![]; + while let Some(res) = unord.next().await { + let res = res.inspect_err(|e| { + veilid_log!(self error target:"network_result", "Error performing end transaction: {}", e); + })?; + + if let Some(end_node_xid) = res { + end_node_xids.push(end_node_xid); + } + } + + Ok(OutboundEndTransactValueResult { + opaque_record_key, + node_xids: end_node_xids, + }) } /// Perform commit transaction queries on the network for a single record @@ -607,7 +885,95 @@ impl StorageManager { pub(super) async fn outbound_commit_transact_value( &self, opaque_record_key: OpaqueRecordKey, - ) -> VeilidAPIResult { + safety_selection: SafetySelection, + writer: KeyPair, + node_xids: Vec, + ) -> VeilidAPIResult { + let routing_domain = RoutingDomain::PublicInternet; + + // Pull the descriptor for this record + let descriptor = { + let mut inner = self.inner.lock().await; + let local_inspect_result = self + .handle_inspect_local_value_inner( + &mut inner, + opaque_record_key.clone(), + ValueSubkeyRangeSet::full(), + true, + ) + .await?; + local_inspect_result.opt_descriptor().unwrap() + }; + + // Send all commits in parallel + let mut unord = FuturesUnordered::new(); + for node_xid in node_xids { + let registry = self.registry(); + + let next_node = node_xid.node_ref(); + let next_xid = node_xid.xid(); + let opaque_record_key = opaque_record_key.clone(); + let safety_selection = safety_selection.clone(); + let descriptor = descriptor.clone(); + let writer = writer.clone(); + + let fut = async move { + let rpc_processor = registry.rpc_processor(); + + let tva = match rpc_processor + .rpc_call_transact_value( + Destination::direct(next_node.routing_domain_filtered(routing_domain)) + .with_safety(safety_selection.clone()), + opaque_record_key.clone(), + Some(next_xid), + TransactValueCommand::Commit, + descriptor.as_ref().clone(), + false, + writer.clone(), + ) + .await + .map_err(VeilidAPIError::from)? + { + NetworkResult::Timeout => { + return VeilidAPIResult::Ok(None); + } + NetworkResult::ServiceUnavailable(_) + | NetworkResult::NoConnection(_) + | NetworkResult::AlreadyExists(_) + | NetworkResult::InvalidMessage(_) => { + return Ok(None); + } + NetworkResult::Value(v) => v, + }; + if tva.answer.accepted { + if tva.answer.needs_descriptor { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key); + } + } else if tva.answer.needs_descriptor { + veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key); + } + + Ok(Some(node_xid)) + }; + + unord.push(fut); + } + + let mut committed_node_xids = vec![]; + while let Some(res) = unord.next().await { + let res = res.inspect_err(|e| { + veilid_log!(self error target:"network_result", "Error performing commit transaction: {}", e); + })?; + + if let Some(committed_node_xid) = res { + committed_node_xids.push(committed_node_xid); + } + } + + Ok(OutboundCommitTransactValueResult { + opaque_record_key, + node_xids: committed_node_xids, + }) } /// Perform rollback transaction queries on the network for a single record @@ -640,8 +1006,8 @@ impl StorageManager { for node_xid in node_xids { let registry = self.registry(); - let next_node = node_xid.node_ref.clone(); - let next_xid = node_xid.xid; + let next_node = node_xid.node_ref(); + let next_xid = node_xid.xid(); let opaque_record_key = opaque_record_key.clone(); let safety_selection = safety_selection.clone(); let descriptor = descriptor.clone(); diff --git a/veilid-wasm/README.md b/veilid-wasm/README.md index ccc0af12..ff1cf215 100644 --- a/veilid-wasm/README.md +++ b/veilid-wasm/README.md @@ -29,8 +29,8 @@ Prerequisites: Run the test script: -- `./wasm_test.sh` to test with debug symbols. -- `./wasm_test.sh release` to test against a release build. +- `./wasm_test_js.sh` to test with debug symbols. +- `./wasm_test_js.sh release` to test against a release build. ## Development notes