checkpoint

This commit is contained in:
Christien Rioux 2025-10-09 13:34:47 -04:00
parent 1e549742c0
commit 4e92fd0911
5 changed files with 743 additions and 79 deletions

View file

@ -1,9 +1,10 @@
mod node_transaction_id;
mod outbound_transaction_per_node_state;
mod outbound_transaction_state;
use crate::storage_manager::transact_value::{
NodeTransactionId, OutboundBeginTransactValueResult, OutboundRollbackTransactValueResult,
OutboundTransactionHandle,
OutboundBeginTransactValueResult, OutboundCommitTransactValueResult,
OutboundEndTransactValueResult, OutboundRollbackTransactValueResult, OutboundTransactionHandle,
};
use super::*;
@ -12,6 +13,7 @@ use outbound_transaction_state::*;
use serde_with::serde_as;
pub(in crate::storage_manager) use node_transaction_id::*;
pub(in crate::storage_manager) use outbound_transaction_state::OutboundTransactionRecordInfo;
impl_veilid_log_facility!("stor");
@ -23,6 +25,22 @@ pub(in crate::storage_manager) struct OutboundBeginTransactValueParams {
pub writer: KeyPair,
}
/// parameters required to end a transaction
pub(in crate::storage_manager) struct OutboundEndTransactValueParams {
pub opaque_record_key: OpaqueRecordKey,
pub safety_selection: SafetySelection,
pub writer: KeyPair,
pub node_xids: Vec<NodeTransactionId>,
}
/// parameters required to commit a transaction
pub(in crate::storage_manager) struct OutboundCommitTransactValueParams {
pub opaque_record_key: OpaqueRecordKey,
pub safety_selection: SafetySelection,
pub writer: KeyPair,
pub node_xids: Vec<NodeTransactionId>,
}
/// parameters required to rollback a transaction
pub(in crate::storage_manager) struct OutboundRollbackTransactValueParams {
pub opaque_record_key: OpaqueRecordKey,
@ -212,21 +230,23 @@ impl OutboundTransactionManager {
}
// See if we have enough transaction nodes per record key
// If we have too many, they can hang out until the transaction is done, as they
// may be useful for sync or get_value inside the transaction later as consensus nodes for some subkeys
// (the N=5 closest nodes will always be used for sets, but other nodes that were previously closer may
// still have newer values than the closest nodes right now)
for record_info in outbound_transaction_state.get_record_infos() {
if record_info.node_xids.len() < outbound_transaction_state.consensus_count() {
failed = true;
} else if record_info.node_xids.len() > outbound_transaction_state.consensus_count() {
// xxx reduce number of node transactions and rollback extra
}
}
// Change stage
if failed {
outbound_transaction_state.set_stage(OutboundTransactionStage::Failed);
} else {
outbound_transaction_state.set_stage(OutboundTransactionStage::Begin);
apibail_try_again!("did not get consensus of transaction ids");
}
outbound_transaction_state.set_stage(OutboundTransactionStage::Begin);
Ok(())
}
@ -243,10 +263,12 @@ impl OutboundTransactionManager {
// Assert stage
if !matches!(
outbound_transaction_state.stage(),
OutboundTransactionStage::Begin | OutboundTransactionStage::End
OutboundTransactionStage::Begin
| OutboundTransactionStage::End
| OutboundTransactionStage::Failed
) {
apibail_internal!(format!(
"stage was {:?}, wanted Begin or End",
"stage was {:?}, wanted Begin, End, or Failed",
outbound_transaction_state.stage(),
));
}
@ -288,21 +310,197 @@ impl OutboundTransactionManager {
OutboundTransactionStage::PreRollback
) {
apibail_internal!(format!(
"stage was {:?}, wanted {:?}",
"stage was {:?}, wanted PreRollback",
outbound_transaction_state.stage(),
OutboundTransactionStage::PreBegin
));
}
// Remove node id transactions
let mut failed = false;
for result in results {
outbound_transaction_state
.remove_node_transaction_ids(&result.opaque_record_key, result.node_xids);
if !outbound_transaction_state
.remove_node_transaction_ids(&result.opaque_record_key, result.node_xids)
{
// If not all transaction ids were rolled back, then this operation failed
failed = true;
}
}
// Change stage
outbound_transaction_state.set_stage(OutboundTransactionStage::Rollback);
if failed {
outbound_transaction_state.set_stage(OutboundTransactionStage::Failed);
apibail_try_again!("did not roll back all transaction ids");
}
outbound_transaction_state.set_stage(OutboundTransactionStage::Rollback);
Ok(())
}
/// Prepare to end a transaction
pub fn prepare_end_transact_value_params(
&mut self,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<Vec<OutboundEndTransactValueParams>> {
let outbound_transaction_state = self
.transactions
.get_mut(&transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction"))?;
// Assert stage
if !matches!(
outbound_transaction_state.stage(),
OutboundTransactionStage::Begin
) {
apibail_internal!(format!(
"stage was {:?}, wanted Begin",
outbound_transaction_state.stage(),
));
}
let mut out = vec![];
for record_info in outbound_transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key.opaque();
let node_xids = record_info.node_xids.clone();
out.push(OutboundEndTransactValueParams {
opaque_record_key,
safety_selection: outbound_transaction_state.safety_selection(),
writer: outbound_transaction_state.member(),
node_xids,
});
}
outbound_transaction_state.set_stage(OutboundTransactionStage::PreEnd);
Ok(out)
}
/// Record end transaction
pub fn record_end_transact_value_results(
&mut self,
transaction_handle: OutboundTransactionHandle,
results: Vec<OutboundEndTransactValueResult>,
) -> VeilidAPIResult<()> {
// Get transaction
let outbound_transaction_state = self
.transactions
.get_mut(&transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction"))?;
// Assert stage
if !matches!(
outbound_transaction_state.stage(),
OutboundTransactionStage::PreEnd
) {
apibail_internal!(format!(
"stage was {:?}, wanted PreEnd",
outbound_transaction_state.stage(),
));
}
// Check if node id transactions reached consensus
let mut failed = false;
for result in results {
if !outbound_transaction_state
.check_node_transaction_ids(&result.opaque_record_key, result.node_xids)
{
// If not all transaction were ended, then this operation failed
failed = true;
}
}
// Change stage
if failed {
outbound_transaction_state.set_stage(OutboundTransactionStage::Failed);
apibail_try_again!("did not end all transactions");
}
outbound_transaction_state.set_stage(OutboundTransactionStage::End);
Ok(())
}
/// Prepare to commit a transaction
pub fn prepare_commit_transact_value_params(
&mut self,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<Vec<OutboundCommitTransactValueParams>> {
let outbound_transaction_state = self
.transactions
.get_mut(&transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction"))?;
// Assert stage
if !matches!(
outbound_transaction_state.stage(),
OutboundTransactionStage::End
) {
apibail_internal!(format!(
"stage was {:?}, wanted End",
outbound_transaction_state.stage(),
));
}
let mut out = vec![];
for record_info in outbound_transaction_state.get_record_infos() {
let opaque_record_key = record_info.record_key.opaque();
let node_xids = record_info.node_xids.clone();
out.push(OutboundCommitTransactValueParams {
opaque_record_key,
safety_selection: outbound_transaction_state.safety_selection(),
writer: outbound_transaction_state.member(),
node_xids,
});
}
outbound_transaction_state.set_stage(OutboundTransactionStage::PreCommit);
Ok(out)
}
/// Record commit transaction
pub fn record_commit_transact_value_results(
&mut self,
transaction_handle: OutboundTransactionHandle,
results: Vec<OutboundCommitTransactValueResult>,
) -> VeilidAPIResult<()> {
// Get transaction
let outbound_transaction_state = self
.transactions
.get_mut(&transaction_handle)
.ok_or_else(|| VeilidAPIError::internal("missing transaction"))?;
// Assert stage
if !matches!(
outbound_transaction_state.stage(),
OutboundTransactionStage::PreCommit
) {
apibail_internal!(format!(
"stage was {:?}, wanted PreCommit",
outbound_transaction_state.stage(),
));
}
// Check if node id transactions reached consensus
let mut failed = false;
for result in results {
if !outbound_transaction_state
.check_node_transaction_ids(&result.opaque_record_key, result.node_xids)
{
// If not all transaction were committed, then this operation failed
failed = true;
}
}
// Change stage
if failed {
outbound_transaction_state.set_stage(OutboundTransactionStage::Failed);
apibail_try_again!("did not commit all transactions");
}
outbound_transaction_state.set_stage(OutboundTransactionStage::Commit);
Ok(())
}
}

View file

@ -0,0 +1,58 @@
use super::*;
/// Transaction id and node id pair
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodeTransactionId {
node_id: NodeId,
xid: u64,
#[serde(skip)]
node_ref: Option<NodeRef>,
}
impl NodeTransactionId {
pub fn new(kind: CryptoKind, xid: u64, node_ref: NodeRef) -> Self {
Self {
node_id: node_ref.node_ids().get(kind).unwrap(),
xid,
node_ref: Some(node_ref),
}
}
pub fn prepare(&mut self, routing_table: &RoutingTable) -> bool {
let Some(node_ref) = routing_table
.lookup_node_ref(self.node_id.clone())
.ok()
.flatten()
else {
return false;
};
self.node_ref = Some(node_ref);
true
}
pub fn node_ref(&self) -> NodeRef {
// Safe as long as prepare has been called
self.node_ref.clone().unwrap()
}
pub fn node_id(&self) -> NodeId {
self.node_id.clone()
}
pub fn xid(&self) -> u64 {
self.xid
}
}
impl PartialEq for NodeTransactionId {
fn eq(&self, other: &NodeTransactionId) -> bool {
self.node_id == other.node_id && self.xid == other.xid
}
}
impl Eq for NodeTransactionId {}
impl fmt::Display for NodeTransactionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:xid={}", self.node_id, self.xid)
}
}

View file

@ -6,9 +6,6 @@ pub(in crate::storage_manager) struct OutboundTransactionRecordInfo {
pub record_key: RecordKey,
pub writer: KeyPair,
pub node_xids: Vec<NodeTransactionId>,
/// Node refs to keep entries around while we're using them
#[serde(skip)]
pub node_refs: Vec<NodeRef>,
}
impl fmt::Display for OutboundTransactionRecordInfo {
@ -33,23 +30,11 @@ impl OutboundTransactionRecordInfo {
record_key,
writer,
node_xids: vec![],
node_refs: vec![],
}
}
pub fn prepare(&mut self, routing_table: &RoutingTable) {
self.node_xids.retain(|x| {
if let Some(node_ref) = routing_table
.lookup_node_ref(x.node_id.clone())
.ok()
.flatten()
{
self.node_refs.push(node_ref);
true
} else {
false
}
})
self.node_xids.retain_mut(|x| x.prepare(routing_table))
}
}
@ -81,6 +66,10 @@ pub(in crate::storage_manager) enum OutboundTransactionStage {
/// State of a single transaction across multiple records
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundTransactionState {
/// The timestamp of when the transaction was created
created_ts: Timestamp,
/// The timestamp of the last stage transition
stage_ts: Timestamp,
/// The operational stage of this transaction
stage: OutboundTransactionStage,
/// How many nodes are required for a consensus for this transaction
@ -97,17 +86,21 @@ impl fmt::Display for OutboundTransactionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
r#"record_infos:
r#"created_ts: {} stage_ts: {} stage: {:?}
member: {} safety_selection: {:?}
record_infos:
{}
member: {}
safety_selection: {:?}"#,
"#,
self.created_ts,
self.stage_ts,
self.stage,
self.member,
self.safety_selection,
self.record_infos
.iter()
.map(|x| format!(" {}", x))
.collect::<Vec<_>>()
.join("\n"),
self.member,
self.safety_selection,
)
}
}
@ -119,7 +112,10 @@ impl OutboundTransactionState {
member: KeyPair,
safety_selection: SafetySelection,
) -> Self {
let cur_ts = Timestamp::now();
Self {
created_ts: cur_ts,
stage_ts: cur_ts,
stage: OutboundTransactionStage::Init,
consensus_count,
record_infos,
@ -134,11 +130,20 @@ impl OutboundTransactionState {
}
}
pub fn created_ts(&self) -> Timestamp {
self.created_ts
}
pub fn stage_ts(&self) -> Timestamp {
self.stage_ts
}
pub fn stage(&self) -> OutboundTransactionStage {
self.stage
}
pub fn set_stage(&mut self, stage: OutboundTransactionStage) {
self.stage_ts = Timestamp::now();
self.stage = stage
}
@ -158,16 +163,28 @@ impl OutboundTransactionState {
self.safety_selection.clone()
}
fn sort_node_xids(opaque_record_key: &OpaqueRecordKey, node_xids: &mut Vec<NodeTransactionId>) {
node_xids.sort_by(|a, b| {
let dist_a = opaque_record_key
.to_hash_coordinate()
.distance(&a.node_id().to_hash_coordinate());
let dist_b = opaque_record_key
.to_hash_coordinate()
.distance(&b.node_id().to_hash_coordinate());
dist_a.cmp(&dist_b)
});
}
pub fn set_node_transaction_ids(
&mut self,
opaque_record_key: &OpaqueRecordKey,
node_xids: Vec<NodeTransactionId>,
mut node_xids: Vec<NodeTransactionId>,
) {
Self::sort_node_xids(opaque_record_key, &mut node_xids);
for ri in &mut self.record_infos {
if &ri.record_key.opaque() == opaque_record_key {
xxx sort by closeness
ri.node_xids = node_xids;
return;
}
@ -175,15 +192,40 @@ impl OutboundTransactionState {
unreachable!("attempting to modify missing record in transaction");
}
pub fn check_node_transaction_ids(
&mut self,
opaque_record_key: &OpaqueRecordKey,
mut node_xids: Vec<NodeTransactionId>,
) -> bool {
Self::sort_node_xids(opaque_record_key, &mut node_xids);
for ri in &mut self.record_infos {
if &ri.record_key.opaque() == opaque_record_key {
let mut count = 0;
for node_xid in &ri.node_xids {
if node_xids.contains(node_xid) {
count += 1;
}
}
return count >= self.consensus_count;
}
}
unreachable!("attempting to modify missing record in transaction");
}
pub fn remove_node_transaction_ids(
&mut self,
opaque_record_key: &OpaqueRecordKey,
node_xids: Vec<NodeTransactionId>,
) {
mut node_xids: Vec<NodeTransactionId>,
) -> bool {
Self::sort_node_xids(opaque_record_key, &mut node_xids);
for ri in &mut self.record_infos {
if &ri.record_key.opaque() == opaque_record_key {
ri.node_xids.retain(|x| !node_xids.contains(x));
return;
return ri.node_xids.is_empty();
}
}
unreachable!("attempting to modify missing record in transaction");

View file

@ -14,19 +14,6 @@ struct SubkeySeqCount {
pub value_nodes: Vec<NodeRef>,
}
/// Transaction id and node id pair
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
pub(super) struct NodeTransactionId {
pub node_id: NodeId,
pub xid: u64,
}
impl fmt::Display for NodeTransactionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:xid={}", self.node_id, self.xid)
}
}
/// The context of the outbound_begin_transact_value operation
struct OutboundBeginTransactValueContext {
/// The combined sequence numbers and result counts so far
@ -61,6 +48,24 @@ pub(super) struct OutboundBeginTransactValueResult {
pub node_xids: Vec<NodeTransactionId>,
}
/// The result of the outbound_end_transact_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundEndTransactValueResult {
/// The record key being transacted
pub opaque_record_key: OpaqueRecordKey,
/// The set of nodes that confirmed transaction end
pub node_xids: Vec<NodeTransactionId>,
}
/// The result of the outbound_commit_transact_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundCommitTransactValueResult {
/// The record key being transacted
pub opaque_record_key: OpaqueRecordKey,
/// The set of nodes that confirmed transaction commit
pub node_xids: Vec<NodeTransactionId>,
}
/// The result of the outbound_rollback_transact_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundRollbackTransactValueResult {
@ -86,7 +91,7 @@ impl StorageManager {
/// Returns a transaction handle if the transaction was created
/// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be created
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn transact_records(
pub async fn begin_transaction(
&self,
record_keys: Vec<RecordKey>,
safety_selection: SafetySelection,
@ -181,7 +186,6 @@ impl StorageManager {
results.push(v);
}
Err(e) => {
// Delete transaction
if opt_begin_error.is_none() {
opt_begin_error = Some(e);
}
@ -207,10 +211,7 @@ impl StorageManager {
// Rollback if any errors happened
if let Some(begin_error) = opt_begin_error {
veilid_log!(self debug "Begin transaction failed, rolling back outbound transaction: {}", begin_error);
if let Err(e) = self
.rollback_outbound_transaction(transaction_handle.clone())
.await
{
if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await {
veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e);
}
@ -223,14 +224,202 @@ impl StorageManager {
Ok(transaction_handle)
}
////////////////////////////////////////////////////////////////////////
/// Roll back a transaction
#[instrument(level = "trace", target = "dht", skip_all, err)]
async fn rollback_outbound_transaction(
/// End a transaction over a set of records
/// If an existing transaction does not exist over these records
/// or a transaction can not be performed at this time, this will fail.
/// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be ended at this time
/// Returns Err(_) if the transaction end failed and resulted in rollback or drop
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn end_transaction(
&self,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Early rejection if dht is not online
if !self.dht_is_online() {
apibail_try_again!("dht is not online");
}
let end_params_list = {
let mut inner = self.inner.lock().await;
// Obtain the outbound transaction manager
let otm = &mut inner.outbound_transaction_manager;
// Prepare for rollback
let commit_params_list = otm
.prepare_end_transact_value_params(transaction_handle.clone())
.unwrap();
commit_params_list
};
// End transactions on all records
let mut unord = FuturesUnordered::new();
for end_params in end_params_list {
let fut = self.clone().outbound_end_transact_value(
end_params.opaque_record_key,
end_params.safety_selection,
end_params.writer,
end_params.node_xids,
);
unord.push(fut);
}
let mut results = vec![];
let mut opt_end_error = None;
while let Some(res) = unord.next().await {
match res {
Ok(v) => {
//
results.push(v);
}
Err(e) => {
if opt_end_error.is_none() {
opt_end_error = Some(e);
}
}
}
}
// Store end results
{
let mut inner = self.inner.lock().await;
let otm = &mut inner.outbound_transaction_manager;
if let Err(e) =
otm.record_end_transact_value_results(transaction_handle.clone(), results)
{
if opt_end_error.is_none() {
opt_end_error = Some(e);
}
}
}
// Rollback if any errors happened
if let Some(end_error) = opt_end_error {
veilid_log!(self debug "End transaction failed, rolling back outbound transaction: {}", end_error);
if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await {
veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e);
}
self.drop_outbound_transaction(transaction_handle).await;
return Err(end_error);
}
Ok(())
}
/// Commit a transaction over a set of records
/// If an existing transaction does not exist over these records
/// or a transaction can not be performed at this time, this will fail.
/// Returns Err(VeilidAPIError::TryAgain) if the transaction could not be committed at this time
/// Returns Err(_) if the transaction commit failed and resulted in rollback or drop
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn commit_transaction(
&self,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Early rejection if dht is not online
if !self.dht_is_online() {
apibail_try_again!("dht is not online");
}
let commit_params_list = {
let mut inner = self.inner.lock().await;
// Obtain the outbound transaction manager
let otm = &mut inner.outbound_transaction_manager;
// Prepare for rollback
let commit_params_list = otm
.prepare_commit_transact_value_params(transaction_handle.clone())
.unwrap();
commit_params_list
};
// Commit transactions on all records
let mut unord = FuturesUnordered::new();
for commit_params in commit_params_list {
let fut = self.clone().outbound_commit_transact_value(
commit_params.opaque_record_key,
commit_params.safety_selection,
commit_params.writer,
commit_params.node_xids,
);
unord.push(fut);
}
let mut results = vec![];
let mut opt_commit_error = None;
while let Some(res) = unord.next().await {
match res {
Ok(v) => {
//
results.push(v);
}
Err(e) => {
if opt_commit_error.is_none() {
opt_commit_error = Some(e);
}
}
}
}
// Store commit results
{
let mut inner = self.inner.lock().await;
let otm = &mut inner.outbound_transaction_manager;
if let Err(e) =
otm.record_commit_transact_value_results(transaction_handle.clone(), results)
{
if opt_commit_error.is_none() {
opt_commit_error = Some(e);
}
}
}
// XXX: handle commit errors better
// // Rollback if any errors happened
// if let Some(commit_error) = opt_commit_error {
// veilid_log!(self debug "Commit transaction failed, rolling back outbound transaction: {}", commit_error);
// if let Err(e) = self.rollback_transaction(transaction_handle.clone()).await {
// veilid_log!(self debug "Failed to roll back outbound transaction, dropping: {}", e);
// }
// self.drop_outbound_transaction(transaction_handle).await;
// return Err(commit_error);
// }
Ok(())
}
/// Roll back a transaction
/// If an error is returned, the transaction is left in a failed state and can either
/// * be dropped/ignored and the remote transaction will time out
/// * another rollback attempt can be made, which may result in a more polite termination of the remote transaction
#[instrument(level = "trace", target = "dht", skip_all, err)]
async fn rollback_transaction(
&self,
transaction_handle: OutboundTransactionHandle,
) -> VeilidAPIResult<()> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
// Early rejection if dht is not online
if !self.dht_is_online() {
apibail_try_again!("dht is not online");
}
let rollback_params_list = {
let mut inner = self.inner.lock().await;
@ -257,7 +446,7 @@ impl StorageManager {
unord.push(fut);
}
let mut results = vec![];
let mut rollback_error = None;
let mut opt_rollback_error = None;
while let Some(res) = unord.next().await {
match res {
Ok(v) => {
@ -265,9 +454,8 @@ impl StorageManager {
results.push(v);
}
Err(e) => {
// Delete transaction
if rollback_error.is_none() {
rollback_error = Some(e);
if opt_rollback_error.is_none() {
opt_rollback_error = Some(e);
}
}
}
@ -280,19 +468,21 @@ impl StorageManager {
if let Err(e) =
otm.record_rollback_transact_value_results(transaction_handle.clone(), results)
{
if rollback_error.is_none() {
rollback_error = Some(e);
if opt_rollback_error.is_none() {
opt_rollback_error = Some(e);
}
}
}
if let Some(rberr) = rollback_error {
if let Some(rberr) = opt_rollback_error {
return Err(rberr);
}
Ok(())
}
////////////////////////////////////////////////////////////////////////
/// Drop a transaction. This eliminates the transaction locally and does not
/// perform any actions on the network. The remote transaction will time out on its own.
#[instrument(level = "trace", target = "dht", skip_all)]
@ -471,7 +661,7 @@ impl StorageManager {
veilid_log!(registry debug target:"network_result", "Got transaction id and seqs back: xid={}, len={}", xid, answer.seqs.len());
// Add transaction id node to list
ctx.node_xids.push(NodeTransactionId { node_ref: next_node.clone(), xid });
ctx.node_xids.push(NodeTransactionId::new(opaque_record_key.kind(), xid, next_node.clone()));
// If we have a prior seqs list, merge in the new seqs
if ctx.seqcounts.is_empty() {
@ -599,7 +789,95 @@ impl StorageManager {
pub(super) async fn outbound_end_transact_value(
&self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<OutboundTransactValueResult> {
safety_selection: SafetySelection,
writer: KeyPair,
node_xids: Vec<NodeTransactionId>,
) -> VeilidAPIResult<OutboundEndTransactValueResult> {
let routing_domain = RoutingDomain::PublicInternet;
// Pull the descriptor for this record
let descriptor = {
let mut inner = self.inner.lock().await;
let local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
ValueSubkeyRangeSet::full(),
true,
)
.await?;
local_inspect_result.opt_descriptor().unwrap()
};
// Send all ends in parallel
let mut unord = FuturesUnordered::new();
for node_xid in node_xids {
let registry = self.registry();
let next_node = node_xid.node_ref();
let next_xid = node_xid.xid();
let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone();
let descriptor = descriptor.clone();
let writer = writer.clone();
let fut = async move {
let rpc_processor = registry.rpc_processor();
let tva = match rpc_processor
.rpc_call_transact_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection.clone()),
opaque_record_key.clone(),
Some(next_xid),
TransactValueCommand::End,
descriptor.as_ref().clone(),
false,
writer.clone(),
)
.await
.map_err(VeilidAPIError::from)?
{
NetworkResult::Timeout => {
return VeilidAPIResult::Ok(None);
}
NetworkResult::ServiceUnavailable(_)
| NetworkResult::NoConnection(_)
| NetworkResult::AlreadyExists(_)
| NetworkResult::InvalidMessage(_) => {
return Ok(None);
}
NetworkResult::Value(v) => v,
};
if tva.answer.accepted {
if tva.answer.needs_descriptor {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key);
}
} else if tva.answer.needs_descriptor {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key);
}
Ok(Some(node_xid))
};
unord.push(fut);
}
let mut end_node_xids = vec![];
while let Some(res) = unord.next().await {
let res = res.inspect_err(|e| {
veilid_log!(self error target:"network_result", "Error performing end transaction: {}", e);
})?;
if let Some(end_node_xid) = res {
end_node_xids.push(end_node_xid);
}
}
Ok(OutboundEndTransactValueResult {
opaque_record_key,
node_xids: end_node_xids,
})
}
/// Perform commit transaction queries on the network for a single record
@ -607,7 +885,95 @@ impl StorageManager {
pub(super) async fn outbound_commit_transact_value(
&self,
opaque_record_key: OpaqueRecordKey,
) -> VeilidAPIResult<OutboundTransactValueResult> {
safety_selection: SafetySelection,
writer: KeyPair,
node_xids: Vec<NodeTransactionId>,
) -> VeilidAPIResult<OutboundCommitTransactValueResult> {
let routing_domain = RoutingDomain::PublicInternet;
// Pull the descriptor for this record
let descriptor = {
let mut inner = self.inner.lock().await;
let local_inspect_result = self
.handle_inspect_local_value_inner(
&mut inner,
opaque_record_key.clone(),
ValueSubkeyRangeSet::full(),
true,
)
.await?;
local_inspect_result.opt_descriptor().unwrap()
};
// Send all commits in parallel
let mut unord = FuturesUnordered::new();
for node_xid in node_xids {
let registry = self.registry();
let next_node = node_xid.node_ref();
let next_xid = node_xid.xid();
let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone();
let descriptor = descriptor.clone();
let writer = writer.clone();
let fut = async move {
let rpc_processor = registry.rpc_processor();
let tva = match rpc_processor
.rpc_call_transact_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection.clone()),
opaque_record_key.clone(),
Some(next_xid),
TransactValueCommand::Commit,
descriptor.as_ref().clone(),
false,
writer.clone(),
)
.await
.map_err(VeilidAPIError::from)?
{
NetworkResult::Timeout => {
return VeilidAPIResult::Ok(None);
}
NetworkResult::ServiceUnavailable(_)
| NetworkResult::NoConnection(_)
| NetworkResult::AlreadyExists(_)
| NetworkResult::InvalidMessage(_) => {
return Ok(None);
}
NetworkResult::Value(v) => v,
};
if tva.answer.accepted {
if tva.answer.needs_descriptor {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' when descriptor was already sent: node={} record_key={}", next_node, opaque_record_key);
}
} else if tva.answer.needs_descriptor {
veilid_log!(registry error target:"network_result", "Got 'needs_descriptor' from node that did not accept: node={} record_key={}", next_node, opaque_record_key);
}
Ok(Some(node_xid))
};
unord.push(fut);
}
let mut committed_node_xids = vec![];
while let Some(res) = unord.next().await {
let res = res.inspect_err(|e| {
veilid_log!(self error target:"network_result", "Error performing commit transaction: {}", e);
})?;
if let Some(committed_node_xid) = res {
committed_node_xids.push(committed_node_xid);
}
}
Ok(OutboundCommitTransactValueResult {
opaque_record_key,
node_xids: committed_node_xids,
})
}
/// Perform rollback transaction queries on the network for a single record
@ -640,8 +1006,8 @@ impl StorageManager {
for node_xid in node_xids {
let registry = self.registry();
let next_node = node_xid.node_ref.clone();
let next_xid = node_xid.xid;
let next_node = node_xid.node_ref();
let next_xid = node_xid.xid();
let opaque_record_key = opaque_record_key.clone();
let safety_selection = safety_selection.clone();
let descriptor = descriptor.clone();

View file

@ -29,8 +29,8 @@ Prerequisites:
Run the test script:
- `./wasm_test.sh` to test with debug symbols.
- `./wasm_test.sh release` to test against a release build.
- `./wasm_test_js.sh` to test with debug symbols.
- `./wasm_test_js.sh release` to test against a release build.
## Development notes