diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 71bb9ece..58203c9d 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -44,7 +44,14 @@ impl Network { // Spawn a local async task for each socket let mut protocol_handlers_unordered = FuturesUnordered::new(); let network_manager = this.network_manager(); - let stop_token = this.inner.lock().stop_source.as_ref().unwrap().token(); + let stop_token = { + let inner = this.inner.lock(); + if inner.stop_source.is_none() { + log_net!(debug "exiting UDP listener before it starts because we encountered an error"); + return; + } + inner.stop_source.as_ref().unwrap().token() + }; for ph in protocol_handlers { let network_manager = network_manager.clone(); diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index e7b8cd05..4f5af94c 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -1234,9 +1234,10 @@ impl RoutingTableInner { let kind = node_id.kind; let mut closest_nodes_locked: Vec = closest_nodes .iter() - .filter_map(|x| { - if x.node_ids().kinds().contains(&kind) { - Some(x.locked(self)) + .filter_map(|nr| { + let nr_locked = nr.locked(self); + if nr_locked.node_ids().kinds().contains(&kind) { + Some(nr_locked) } else { None } diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 27119fe7..671835ce 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -184,17 +184,22 @@ where } // Delete record - rt_xact.delete(0, &k.bytes()); + if let Err(e) = rt_xact.delete(0, &k.bytes()) { + log_stor!(error "record could not be deleted: {}", e); + } // Delete subkeys - let subkey_count = v.subkey_count() as u32; - for sk in 0..subkey_count { + let stored_subkeys = v.stored_subkeys(); + for sk in stored_subkeys.iter() { // From table let stk = SubkeyTableKey { key: k.key, subkey: sk, }; - st_xact.delete(0, &stk.bytes()); + let stkb = stk.bytes(); + if let Err(e) = st_xact.delete(0, &stkb) { + log_stor!(error "subkey could not be deleted: {}", e); + } // From cache self.remove_from_subkey_cache(stk); @@ -355,8 +360,8 @@ where want_descriptor: bool, ) -> VeilidAPIResult> { // record from index - let Some((subkey_count, opt_descriptor)) = self.with_record(key, |record| { - (record.subkey_count(), if want_descriptor { + let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| { + (record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor { Some(record.descriptor().clone()) } else { None @@ -371,6 +376,15 @@ where apibail_invalid_argument!("subkey out of range", "subkey", subkey); } + // See if we have this subkey stored + if !has_subkey { + // If not, return no value but maybe with descriptor + return Ok(Some(SubkeyResult { + value: None, + descriptor: opt_descriptor, + })); + } + // Get subkey table let Some(subkey_table) = self.subkey_table.clone() else { apibail_internal!("record store not initialized"); @@ -386,28 +400,23 @@ where descriptor: opt_descriptor, })); } - // If not in cache, try to pull from table store - if let Some(record_data) = subkey_table + // If not in cache, try to pull from table store if it is in our stored subkey set + let Some(record_data) = subkey_table .load_rkyv::(0, &stk.bytes()) .await - .map_err(VeilidAPIError::internal)? - { - let out = record_data.signed_value_data().clone(); + .map_err(VeilidAPIError::internal)? else { + apibail_internal!("failed to get subkey that was stored"); + }; - // Add to cache, do nothing with lru out - self.add_to_subkey_cache(stk, record_data); + let out = record_data.signed_value_data().clone(); - return Ok(Some(SubkeyResult { - value: Some(out), - descriptor: opt_descriptor, - })); - }; + // Add to cache, do nothing with lru out + self.add_to_subkey_cache(stk, record_data); - // Record was available, but subkey was not found, maybe descriptor gets returned - Ok(Some(SubkeyResult { - value: None, + return Ok(Some(SubkeyResult { + value: Some(out), descriptor: opt_descriptor, - })) + })); } pub async fn set_subkey( @@ -492,6 +501,7 @@ where // Update record self.with_record_mut(key, |record| { + record.store_subkey(subkey); record.set_record_data_size(new_record_data_size); }) .expect("record should still be here"); @@ -522,10 +532,11 @@ where out += "Record Index:\n"; for (rik, rec) in &self.record_index { out += &format!( - " {} @ {} len={}\n", + " {} @ {} len={} subkeys={}\n", rik.key.to_string(), rec.last_touched().as_u64(), - rec.record_data_size() + rec.record_data_size(), + rec.stored_subkeys(), ); } out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len()); diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index c8ec9cb7..c146380d 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -12,6 +12,7 @@ where { descriptor: SignedValueDescriptor, subkey_count: usize, + stored_subkeys: ValueSubkeyRangeSet, last_touched_ts: Timestamp, record_data_size: usize, detail: D, @@ -33,6 +34,7 @@ where Ok(Self { descriptor, subkey_count, + stored_subkeys: ValueSubkeyRangeSet::new(), last_touched_ts: cur_ts, record_data_size: 0, detail, @@ -50,6 +52,13 @@ where self.subkey_count } + pub fn stored_subkeys(&self) -> &ValueSubkeyRangeSet { + &self.stored_subkeys + } + pub fn store_subkey(&mut self, subkey: ValueSubkey) { + self.stored_subkeys.insert(subkey); + } + pub fn touch(&mut self, cur_ts: Timestamp) { self.last_touched_ts = cur_ts } diff --git a/veilid-core/src/table_store/table_db.rs b/veilid-core/src/table_store/table_db.rs index 0702a6de..f8f5b69a 100644 --- a/veilid-core/src/table_store/table_db.rs +++ b/veilid-core/src/table_store/table_db.rs @@ -45,6 +45,7 @@ impl Drop for TableDBUnlockedInner { #[derive(Debug, Clone)] pub struct TableDB { + opened_column_count: u32, unlocked_inner: Arc, } @@ -56,11 +57,13 @@ impl TableDB { database: Database, encryption_key: Option, decryption_key: Option, + opened_column_count: u32, ) -> Self { let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek)); let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk)); Self { + opened_column_count, unlocked_inner: Arc::new(TableDBUnlockedInner { table, table_store, @@ -71,8 +74,12 @@ impl TableDB { } } - pub(super) fn try_new_from_weak_inner(weak_inner: Weak) -> Option { + pub(super) fn try_new_from_weak_inner( + weak_inner: Weak, + opened_column_count: u32, + ) -> Option { weak_inner.upgrade().map(|table_db_unlocked_inner| Self { + opened_column_count, unlocked_inner: table_db_unlocked_inner, }) } @@ -82,6 +89,7 @@ impl TableDB { } /// Get the total number of columns in the TableDB + /// Not the number of columns that were opened, rather the total number that could be opened pub fn get_column_count(&self) -> VeilidAPIResult { let db = &self.unlocked_inner.database; db.num_columns().map_err(VeilidAPIError::from) @@ -144,8 +152,14 @@ impl TableDB { } } - /// Get the list of keys in a column of the TableDB + /// Get the list of keys in a column of the TableDAB pub async fn get_keys(&self, col: u32) -> VeilidAPIResult>> { + if col >= self.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.opened_column_count + )); + } let db = self.unlocked_inner.database.clone(); let mut out = Vec::new(); db.iter_keys(col, None, |k| { @@ -165,6 +179,12 @@ impl TableDB { /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately. pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { + if col >= self.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.opened_column_count + )); + } let db = self.unlocked_inner.database.clone(); let mut dbt = db.transaction(); dbt.put( @@ -195,6 +215,12 @@ impl TableDB { /// Read a key from a column in the TableDB immediately. pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { + if col >= self.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.opened_column_count + )); + } let db = self.unlocked_inner.database.clone(); let key = self.maybe_encrypt(key, true); Ok(db @@ -233,6 +259,12 @@ impl TableDB { /// Delete key with from a column in the TableDB pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult>> { + if col >= self.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.opened_column_count + )); + } let key = self.maybe_encrypt(key, true); let db = self.unlocked_inner.database.clone(); @@ -330,11 +362,19 @@ impl TableDBTransaction { } /// Store a key with a value in a column in the TableDB - pub fn store(&self, col: u32, key: &[u8], value: &[u8]) { + pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> { + if col >= self.db.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.db.opened_column_count + )); + } + let key = self.db.maybe_encrypt(key, true); let value = self.db.maybe_encrypt(value, false); let mut inner = self.inner.lock(); inner.dbt.as_mut().unwrap().put_owned(col, key, value); + Ok(()) } /// Store a key in rkyv format with a value in a column in the TableDB @@ -343,12 +383,7 @@ impl TableDBTransaction { T: RkyvSerialize, { let value = to_rkyv(value)?; - let key = self.db.maybe_encrypt(key, true); - let value = self.db.maybe_encrypt(&value, false); - - let mut inner = self.inner.lock(); - inner.dbt.as_mut().unwrap().put_owned(col, key, value); - Ok(()) + self.store(col, key, &value) } /// Store a key in rkyv format with a value in a column in the TableDB @@ -357,19 +392,22 @@ impl TableDBTransaction { T: serde::Serialize, { let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?; - let key = self.db.maybe_encrypt(key, true); - let value = self.db.maybe_encrypt(&value, false); - - let mut inner = self.inner.lock(); - inner.dbt.as_mut().unwrap().put_owned(col, key, value); - Ok(()) + self.store(col, key, &value) } /// Delete key with from a column in the TableDB - pub fn delete(&self, col: u32, key: &[u8]) { + pub fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> { + if col >= self.db.opened_column_count { + apibail_generic!(format!( + "Column exceeds opened column count {} >= {}", + col, self.db.opened_column_count + )); + } + let key = self.db.maybe_encrypt(key, true); let mut inner = self.inner.lock(); inner.dbt.as_mut().unwrap().delete_owned(col, key); + Ok(()) } } diff --git a/veilid-core/src/table_store/table_store.rs b/veilid-core/src/table_store/table_store.rs index 648fa65c..1831b46e 100644 --- a/veilid-core/src/table_store/table_store.rs +++ b/veilid-core/src/table_store/table_store.rs @@ -428,6 +428,7 @@ impl TableStore { } pub(crate) fn on_table_db_drop(&self, table: String) { + log_rtab!(debug "dropping table db: {}", table); let mut inner = self.inner.lock(); if inner.opened.remove(&table).is_none() { unreachable!("should have removed an item"); @@ -449,12 +450,21 @@ impl TableStore { let table_name = self.name_get_or_create(name).await?; - // See if this table is already opened + // See if this table is already opened, if so the column count must be the same { let mut inner = self.inner.lock(); if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { - match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) { + match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone(), column_count) { Some(tdb) => { + // Ensure column count isnt bigger + let existing_col_count = tdb.get_column_count()?; + if column_count > existing_col_count { + return Err(VeilidAPIError::generic(format!( + "database must be closed before increasing column count {} -> {}", + existing_col_count, column_count, + ))); + } + return Ok(tdb); } None => { @@ -465,7 +475,7 @@ impl TableStore { } // Open table db using platform-specific driver - let db = match self + let mut db = match self .table_store_driver .open(&table_name, column_count) .await @@ -481,6 +491,24 @@ impl TableStore { // Flush table names to disk self.flush().await; + // If more columns are available, open the low level db with the max column count but restrict the tabledb object to the number requested + let existing_col_count = db.num_columns().map_err(VeilidAPIError::from)?; + if existing_col_count > column_count { + drop(db); + db = match self + .table_store_driver + .open(&table_name, existing_col_count) + .await + { + Ok(db) => db, + Err(e) => { + self.name_delete(name).await.expect("cleanup failed"); + self.flush().await; + return Err(e); + } + }; + } + // Wrap low-level Database in TableDB object let mut inner = self.inner.lock(); let table_db = TableDB::new( @@ -490,6 +518,7 @@ impl TableStore { db, inner.encryption_key.clone(), inner.encryption_key.clone(), + column_count, ); // Keep track of opened DBs diff --git a/veilid-core/src/table_store/tests/test_table_store.rs b/veilid-core/src/table_store/tests/test_table_store.rs index 41bfd022..86383f39 100644 --- a/veilid-core/src/table_store/tests/test_table_store.rs +++ b/veilid-core/src/table_store/tests/test_table_store.rs @@ -132,6 +132,41 @@ pub async fn test_store_delete_load(ts: TableStore) { assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec())); } +pub async fn test_transaction(ts: TableStore) { + trace!("test_transaction"); + + let _ = ts.delete("test"); + let db = ts.open("test", 3).await.expect("should have opened"); + assert!( + ts.delete("test").await.is_err(), + "should fail because file is opened" + ); + + let tx = db.transact(); + assert!(tx.store(0, b"aaa", b"a-value").is_ok()); + assert!(tx.store_json(1, b"bbb", &"b-value".to_owned()).is_ok()); + assert!(tx.store_rkyv(2, b"ccc", &"c-value".to_owned()).is_ok()); + assert!(tx.store(3, b"ddd", b"d-value").is_err()); + assert!(tx.store(0, b"ddd", b"d-value").is_ok()); + assert!(tx.delete(0, b"ddd").is_ok()); + assert!(tx.commit().await.is_ok()); + + let tx = db.transact(); + assert!(tx.delete(2, b"ccc").is_ok()); + tx.rollback(); + + assert_eq!(db.load(0, b"aaa").await, Ok(Some(b"a-value".to_vec()))); + assert_eq!( + db.load_json::(1, b"bbb").await, + Ok(Some("b-value".to_owned())) + ); + assert_eq!( + db.load_rkyv::(2, b"ccc").await, + Ok(Some("c-value".to_owned())) + ); + assert_eq!(db.load(0, b"ddd").await, Ok(None)); +} + pub async fn test_rkyv(vcrypto: CryptoSystemVersion, ts: TableStore) { trace!("test_rkyv"); @@ -268,6 +303,7 @@ pub async fn test_all() { test_protect_unprotect(vcrypto.clone(), ts.clone()).await; test_delete_open_delete(ts.clone()).await; test_store_delete_load(ts.clone()).await; + test_transaction(ts.clone()).await; test_rkyv(vcrypto.clone(), ts.clone()).await; test_json(vcrypto, ts.clone()).await; let _ = ts.delete("test").await; diff --git a/veilid-core/src/veilid_api/json_api/process.rs b/veilid-core/src/veilid_api/json_api/process.rs index 84eb97eb..023820d0 100644 --- a/veilid-core/src/veilid_api/json_api/process.rs +++ b/veilid-core/src/veilid_api/json_api/process.rs @@ -405,12 +405,15 @@ impl JsonRequestProcessor { TableDbTransactionResponseOp::Rollback {} } TableDbTransactionRequestOp::Store { col, key, value } => { - table_db_transaction.store(col, &key, &value); - TableDbTransactionResponseOp::Store {} + TableDbTransactionResponseOp::Store { + result: to_json_api_result(table_db_transaction.store(col, &key, &value)), + } } + TableDbTransactionRequestOp::Delete { col, key } => { - table_db_transaction.delete(col, &key); - TableDbTransactionResponseOp::Delete {} + TableDbTransactionResponseOp::Delete { + result: to_json_api_result(table_db_transaction.delete(col, &key)), + } } }; TableDbTransactionResponse { diff --git a/veilid-core/src/veilid_api/json_api/table_db.rs b/veilid-core/src/veilid_api/json_api/table_db.rs index e2741730..3a1d2008 100644 --- a/veilid-core/src/veilid_api/json_api/table_db.rs +++ b/veilid-core/src/veilid_api/json_api/table_db.rs @@ -124,6 +124,12 @@ pub enum TableDbTransactionResponseOp { result: ApiResult<()>, }, Rollback {}, - Store {}, - Delete {}, + Store { + #[serde(flatten)] + result: ApiResult<()>, + }, + Delete { + #[serde(flatten)] + result: ApiResult<()>, + }, } diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index 466346d3..68298f37 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -4,7 +4,6 @@ use range_set_blaze::*; #[derive( Clone, - Debug, Default, PartialOrd, PartialEq, @@ -55,3 +54,15 @@ impl DerefMut for ValueSubkeyRangeSet { &mut self.data } } + +impl fmt::Debug for ValueSubkeyRangeSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.data) + } +} + +impl fmt::Display for ValueSubkeyRangeSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.data) + } +} diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index 2a9b65a7..ba969ee7 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -884,7 +884,7 @@ pub extern "C" fn table_db_transaction_store(port: i64, id: u32, col: u32, key: tdbt.clone() }; - tdbt.store(col, &key, &value); + tdbt.store(col, &key, &value)?; APIRESULT_VOID }); } @@ -908,7 +908,7 @@ pub extern "C" fn table_db_transaction_delete(port: i64, id: u32, col: u32, key: tdbt.clone() }; - tdbt.delete(col, &key); + tdbt.delete(col, &key)?; APIRESULT_VOID }); } diff --git a/veilid-python/tests/test_basic.py b/veilid-python/tests/test_basic.py index 003ed054..7ede6f95 100644 --- a/veilid-python/tests/test_basic.py +++ b/veilid-python/tests/test_basic.py @@ -9,12 +9,12 @@ from .conftest import simple_update_callback @pytest.mark.asyncio -async def test_connect(api_connection): +async def test_connect(api_connection: veilid.VeilidAPI): pass @pytest.mark.asyncio -async def test_get_node_id(api_connection): +async def test_get_node_id(api_connection: veilid.VeilidAPI): state = await api_connection.get_state() node_ids = state.config.config.network.routing_table.node_id @@ -35,7 +35,7 @@ async def test_fail_connect(): @pytest.mark.asyncio -async def test_version(api_connection): +async def test_version(api_connection: veilid.VeilidAPI): v = await api_connection.veilid_version() print(f"veilid_version: {v.__dict__}") assert v.__dict__.keys() >= {"_major", "_minor", "_patch"} diff --git a/veilid-python/tests/test_crypto.py b/veilid-python/tests/test_crypto.py index 41712e7a..f98410ae 100644 --- a/veilid-python/tests/test_crypto.py +++ b/veilid-python/tests/test_crypto.py @@ -3,29 +3,26 @@ import pytest import veilid from veilid.api import CryptoSystem +import gc @pytest.mark.asyncio -async def test_best_crypto_system(api_connection): - bcs: CryptoSystem = await api_connection.best_crypto_system() - - assert await bcs.default_salt_length() == 16 - +async def test_best_crypto_system(api_connection: veilid.VeilidAPI): + cs: CryptoSystem = await api_connection.best_crypto_system() + async with cs: + assert await cs.default_salt_length() == 16 @pytest.mark.asyncio -async def test_get_crypto_system(api_connection): +async def test_get_crypto_system(api_connection: veilid.VeilidAPI): cs: CryptoSystem = await api_connection.get_crypto_system( veilid.CryptoKind.CRYPTO_KIND_VLD0 ) - - assert await cs.default_salt_length() == 16 - - # clean up handle early - del cs - + async with cs: + assert await cs.default_salt_length() == 16 + @pytest.mark.asyncio -async def test_get_crypto_system_invalid(api_connection): +async def test_get_crypto_system_invalid(api_connection: veilid.VeilidAPI): with pytest.raises(veilid.VeilidAPIErrorInvalidArgument) as exc: await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE) @@ -35,15 +32,17 @@ async def test_get_crypto_system_invalid(api_connection): @pytest.mark.asyncio -async def test_hash_and_verify_password(api_connection): - bcs = await api_connection.best_crypto_system() - nonce = await bcs.random_nonce() - salt = nonce.to_bytes() +async def test_hash_and_verify_password(api_connection: veilid.VeilidAPI): + cs = await api_connection.best_crypto_system() + async with cs: + nonce = await cs.random_nonce() + salt = nonce.to_bytes() - # Password match - phash = await bcs.hash_password(b"abc123", salt) - assert await bcs.verify_password(b"abc123", phash) + # Password match + phash = await cs.hash_password(b"abc123", salt) + assert await cs.verify_password(b"abc123", phash) + + # Password mismatch + phash2 = await cs.hash_password(b"abc1234", salt) + assert not await cs.verify_password(b"abc12345", phash) - # Password mismatch - phash2 = await bcs.hash_password(b"abc1234", salt) - assert not await bcs.verify_password(b"abc12345", phash) diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py new file mode 100644 index 00000000..35c49e1b --- /dev/null +++ b/veilid-python/tests/test_dht.py @@ -0,0 +1,51 @@ +# Routing context veilid tests + +import veilid +import pytest +import asyncio +import json +from . import * + +################################################################## +BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' ')) + +# @pytest.mark.asyncio +# async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): +# rc = await api_connection.new_routing_context() +# async with rc: +# with pytest.raises(veilid.VeilidAPIError): +# out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False) + + +# @pytest.mark.asyncio +# async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI): +# rc = await api_connection.new_routing_context() +# async with rc: +# with pytest.raises(veilid.VeilidAPIError): +# out = await rc.open_dht_record(BOGUS_KEY, None) + +# @pytest.mark.asyncio +# async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): +# rc = await api_connection.new_routing_context() +# async with rc: +# with pytest.raises(veilid.VeilidAPIError): +# await rc.close_dht_record(BOGUS_KEY) + +# @pytest.mark.asyncio +# async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): +# rc = await api_connection.new_routing_context() +# async with rc: +# with pytest.raises(veilid.VeilidAPIError): +# await rc.delete_dht_record(BOGUS_KEY) + +# @pytest.mark.asyncio +# async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): +# rc = await api_connection.new_routing_context() +# async with rc: +# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1)) +# await rc.close_dht_record(rec.key) +# await rc.delete_dht_record(rec.key) + +# xxx make tests for tabledb api first +# xxx then make a test that creates a record, stores it in a table +# xxx then make another test that gets the keys from the table and closes/deletes them diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 8003ad56..bf20d77c 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -12,11 +12,15 @@ from .conftest import server_info @pytest.mark.asyncio -async def test_routing_contexts(api_connection): +async def test_routing_contexts(api_connection: veilid.VeilidAPI): rc = await api_connection.new_routing_context() - rcp = await rc.with_privacy() - rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED) - await rcps.with_custom_privacy(veilid.Stability.RELIABLE) + async with rc: + rcp = await rc.with_privacy(release = False) + async with rcp: + rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED, release = False) + async with rcps: + rcpscp = await rcps.with_custom_privacy(veilid.Stability.RELIABLE, release = False) + await rcpscp.release() @pytest.mark.asyncio @@ -38,24 +42,25 @@ async def test_routing_context_app_message_loopback(): # make a routing context that uses a safety route rc = await (await api.new_routing_context()).with_privacy() + async with rc: + + # make a new local private route + prl, blob = await api.new_private_route() - # make a new local private route - prl, blob = await api.new_private_route() + # import it as a remote route as well so we can send to it + prr = await api.import_remote_private_route(blob) - # import it as a remote route as well so we can send to it - prr = await api.import_remote_private_route(blob) + # send an app message to our own private route + message = b"abcd1234" + await rc.app_message(prr, message) - # send an app message to our own private route - message = b"abcd1234" - await rc.app_message(prr, message) + # we should get the same message back + update: veilid.VeilidUpdate = await asyncio.wait_for( + app_message_queue.get(), timeout=10 + ) - # we should get the same message back - update: veilid.VeilidUpdate = await asyncio.wait_for( - app_message_queue.get(), timeout=10 - ) - - assert isinstance(update.detail, veilid.VeilidAppMessage) - assert update.detail.message == message + assert isinstance(update.detail, veilid.VeilidAppMessage) + assert update.detail.message == message @pytest.mark.asyncio @@ -74,37 +79,33 @@ async def test_routing_context_app_call_loopback(): # make a routing context that uses a safety route rc = await (await api.new_routing_context()).with_privacy() + async with rc: + + # make a new local private route + prl, blob = await api.new_private_route() - # make a new local private route - prl, blob = await api.new_private_route() + # import it as a remote route as well so we can send to it + prr = await api.import_remote_private_route(blob) - # import it as a remote route as well so we can send to it - prr = await api.import_remote_private_route(blob) + # send an app message to our own private route + request = b"abcd1234" + app_call_task = asyncio.create_task( + rc.app_call(prr, request), name="app call task" + ) - # send an app message to our own private route - request = b"abcd1234" - app_call_task = asyncio.create_task( - rc.app_call(prr, request), name="app call task" - ) + # we should get the same request back + update: veilid.VeilidUpdate = await asyncio.wait_for( + app_call_queue.get(), timeout=10 + ) + appcall = update.detail - # we should get the same request back - update: veilid.VeilidUpdate = await asyncio.wait_for( - app_call_queue.get(), timeout=10 - ) - appcall = update.detail + assert isinstance(appcall, veilid.VeilidAppCall) + assert appcall.message == request - assert isinstance(appcall, veilid.VeilidAppCall) - assert appcall.message == request + # now we reply to the request + reply = b"qwer5678" + await api.app_call_reply(appcall.call_id, reply) - # now we reply to the request - reply = b"qwer5678" - # TK: OperationId use to be a subclass of `int`. When I wrapped `appcall.call_id` in int(), - # this failed JSON schema validation, which defines `call_id` as a string. Maybe that was a - # typo, and OperationId is really *supposed* to be a str? Alternatively, perhaps the - # signature of `app_call_reply` is wrong and it's supposed to take a type other than - # OperationId? - await api.app_call_reply(OperationId(appcall.call_id), reply) - - # now we should get the reply from the call - result = await app_call_task - assert result == reply + # now we should get the reply from the call + result = await app_call_task + assert result == reply diff --git a/veilid-python/tests/test_table_db.py b/veilid-python/tests/test_table_db.py new file mode 100644 index 00000000..9b904ae8 --- /dev/null +++ b/veilid-python/tests/test_table_db.py @@ -0,0 +1,127 @@ +# TableDB veilid tests + +import pytest +import veilid +from veilid.api import CryptoSystem + + +TEST_DB = "__pytest_db" +TEST_NONEXISTENT_DB = "__pytest_nonexistent_db" + +@pytest.mark.asyncio +async def test_delete_table_db_nonexistent(api_connection: veilid.VeilidAPI): + deleted = await api_connection.delete_table_db(TEST_NONEXISTENT_DB) + assert not deleted + + +@pytest.mark.asyncio +async def test_open_delete_table_db(api_connection: veilid.VeilidAPI): + # delete test db if it exists + await api_connection.delete_table_db(TEST_DB) + + tdb = await api_connection.open_table_db(TEST_DB, 1) + async with tdb: + # delete should fail since it is still open + with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: + await api_connection.delete_table_db(TEST_DB) + # drop the db + + # now delete should succeed + deleted = await api_connection.delete_table_db(TEST_DB) + assert deleted + +@pytest.mark.asyncio +async def test_open_twice_table_db(api_connection: veilid.VeilidAPI): + # delete test db if it exists + await api_connection.delete_table_db(TEST_DB) + + tdb = await api_connection.open_table_db(TEST_DB, 1) + tdb2 = await api_connection.open_table_db(TEST_DB, 1) + + # delete should fail because open + with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: + await api_connection.delete_table_db(TEST_DB) + await tdb.release() + + # delete should fail because open + with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: + await api_connection.delete_table_db(TEST_DB) + await tdb2.release() + + # delete should now succeed + deleted = await api_connection.delete_table_db(TEST_DB) + assert deleted + + +@pytest.mark.asyncio +async def test_open_twice_table_db_store_load(api_connection: veilid.VeilidAPI): + # delete test db if it exists + await api_connection.delete_table_db(TEST_DB) + + tdb = await api_connection.open_table_db(TEST_DB, 1) + async with tdb: + tdb2 = await api_connection.open_table_db(TEST_DB, 1) + async with tdb2: + # store into first db copy + await tdb.store(b"asdf", b"1234") + # load from second db copy + assert await tdb.load(b"asdf") == b"1234" + + # delete should now succeed + deleted = await api_connection.delete_table_db(TEST_DB) + assert deleted + +@pytest.mark.asyncio +async def test_open_twice_table_db_store_delete_load(api_connection: veilid.VeilidAPI): + # delete test db if it exists + await api_connection.delete_table_db(TEST_DB) + + tdb = await api_connection.open_table_db(TEST_DB, 1) + async with tdb: + tdb2 = await api_connection.open_table_db(TEST_DB, 1) + async with tdb2: + + # store into first db copy + await tdb.store(b"asdf", b"1234") + # delete from second db copy and clean up + await tdb2.delete(b"asdf") + + # load from first db copy + assert await tdb.load(b"asdf") == None + + # delete should now succeed + deleted = await api_connection.delete_table_db(TEST_DB) + assert deleted + + +@pytest.mark.asyncio +async def test_resize_table_db(api_connection: veilid.VeilidAPI): + # delete test db if it exists + await api_connection.delete_table_db(TEST_DB) + + tdb = await api_connection.open_table_db(TEST_DB, 1) + async with tdb: + # reopen the db with more columns should fail if it is already open + with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: + await api_connection.open_table_db(TEST_DB, 2) + + tdb2 = await api_connection.open_table_db(TEST_DB, 2) + async with tdb2: + # write something to second column + await tdb2.store(b"qwer", b"5678", col = 1) + + # reopen the db with fewer columns + tdb = await api_connection.open_table_db(TEST_DB, 1) + async with tdb: + + # Should fail access to second column + with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc: + await tdb.load(b"qwer", col = 1) + + # Should succeed with access to second column + assert await tdb2.load(b"qwer", col = 1) == b"5678" + + # now delete should succeed + deleted = await api_connection.delete_table_db(TEST_DB) + assert deleted + diff --git a/veilid-python/veilid/api.py b/veilid-python/veilid/api.py index e3a6b6ef..28ef4b1b 100644 --- a/veilid-python/veilid/api.py +++ b/veilid-python/veilid/api.py @@ -7,15 +7,19 @@ from .state import VeilidState class RoutingContext(ABC): @abstractmethod - async def with_privacy(self) -> Self: + async def release(self): pass @abstractmethod - async def with_custom_privacy(self, stability: types.Stability) -> Self: + async def with_privacy(self, release = True) -> Self: pass @abstractmethod - async def with_sequencing(self, sequencing: types.Sequencing) -> Self: + async def with_custom_privacy(self, stability: types.Stability, release = True) -> Self: + pass + + @abstractmethod + async def with_sequencing(self, sequencing: types.Sequencing, release = True) -> Self: pass @abstractmethod @@ -89,21 +93,25 @@ class TableDbTransaction(ABC): pass @abstractmethod - async def store(self, col: int, key: bytes, value: bytes): + async def store(self, key: bytes, value: bytes, col: int = 0): pass @abstractmethod - async def delete(self, col: int, key: bytes): + async def delete(self, key: bytes, col: int = 0): pass class TableDb(ABC): + @abstractmethod + async def release(self): + pass + @abstractmethod async def get_column_count(self) -> int: pass @abstractmethod - async def get_keys(self, col: int) -> list[bytes]: + async def get_keys(self, col: int = 0) -> list[bytes]: pass @abstractmethod @@ -111,19 +119,23 @@ class TableDb(ABC): pass @abstractmethod - async def store(self, col: int, key: bytes, value: bytes): + async def store(self, key: bytes, value: bytes, col: int = 0): pass @abstractmethod - async def load(self, col: int, key: bytes) -> Optional[bytes]: + async def load(self, key: bytes, col: int = 0) -> Optional[bytes]: pass @abstractmethod - async def delete(self, col: int, key: bytes) -> Optional[bytes]: + async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]: pass class CryptoSystem(ABC): + @abstractmethod + async def release(self): + pass + @abstractmethod async def cached_dh( self, key: types.PublicKey, secret: types.SecretKey @@ -284,7 +296,7 @@ class VeilidAPI(ABC): pass @abstractmethod - async def delete_table_db(self, name: str): + async def delete_table_db(self, name: str) -> bool: pass @abstractmethod diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 02007243..a072e367 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -320,7 +320,7 @@ class _JsonVeilidAPI(VeilidAPI): ) return _JsonTableDb(self, db_id) - async def delete_table_db(self, name: str): + async def delete_table_db(self, name: str) -> bool: return raise_api_result( await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name) ) @@ -411,19 +411,44 @@ def validate_rc_op(request: dict, response: dict): class _JsonRoutingContext(RoutingContext): api: _JsonVeilidAPI rc_id: int + done: bool def __init__(self, api: _JsonVeilidAPI, rc_id: int): self.api = api self.rc_id = rc_id + self.done = False def __del__(self): - self.api.send_one_way_ndjson_request( - Operation.ROUTING_CONTEXT, - rc_id=self.rc_id, - rc_op=RoutingContextOperation.RELEASE, - ) + if not self.done: + # attempt to clean up server-side anyway + self.api.send_one_way_ndjson_request( + Operation.ROUTING_CONTEXT, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.RELEASE + ) + + # complain + raise AssertionError("Should have released routing context before dropping object") - async def with_privacy(self) -> Self: + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *excinfo): + if not self.done: + await self.release() + + async def release(self): + if self.done: + return + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.RELEASE + ) + self.done = True + + async def with_privacy(self, release = True) -> Self: new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -432,9 +457,11 @@ class _JsonRoutingContext(RoutingContext): rc_op=RoutingContextOperation.WITH_PRIVACY, ) ) + if release: + await self.release() return self.__class__(self.api, new_rc_id) - async def with_custom_privacy(self, stability: Stability) -> Self: + async def with_custom_privacy(self, stability: Stability, release = True) -> Self: new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -444,9 +471,11 @@ class _JsonRoutingContext(RoutingContext): stability=stability, ) ) + if release: + await self.release() return self.__class__(self.api, new_rc_id) - async def with_sequencing(self, sequencing: Sequencing) -> Self: + async def with_sequencing(self, sequencing: Sequencing, release = True) -> Self: new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -456,6 +485,8 @@ class _JsonRoutingContext(RoutingContext): sequencing=sequencing, ) ) + if release: + await self.release() return self.__class__(self.api, new_rc_id) async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes: @@ -627,9 +658,27 @@ class _JsonTableDbTransaction(TableDbTransaction): def __del__(self): if not self.done: - raise AssertionError("Should have committed or rolled back transaction") + # attempt to clean up server-side anyway + self.api.send_one_way_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.ROLLBACK, + ) + + # complain + raise AssertionError("Should have committed or rolled back transaction before dropping object") + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *excinfo): + if not self.done: + await self.rollback() async def commit(self): + if self.done: + raise AssertionError("Transaction is already done") + raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, @@ -641,17 +690,17 @@ class _JsonTableDbTransaction(TableDbTransaction): self.done = True async def rollback(self): - raise_api_result( - await self.api.send_ndjson_request( - Operation.TABLE_DB_TRANSACTION, - validate=validate_tx_op, - tx_id=self.tx_id, - tx_op=TableDbTransactionOperation.ROLLBACK, - ) + if self.done: + raise AssertionError("Transaction is already done") + await self.api.send_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + validate=validate_tx_op, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.ROLLBACK, ) self.done = True - async def store(self, col: int, key: bytes, value: bytes): + async def store(self, key: bytes, value: bytes, col: int = 0): await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -662,7 +711,7 @@ class _JsonTableDbTransaction(TableDbTransaction): value=value, ) - async def delete(self, col: int, key: bytes): + async def delete(self, key: bytes, col: int = 0): await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -684,15 +733,44 @@ def validate_db_op(request: dict, response: dict): class _JsonTableDb(TableDb): api: _JsonVeilidAPI db_id: int + done: bool def __init__(self, api: _JsonVeilidAPI, db_id: int): self.api = api self.db_id = db_id + self.done = False def __del__(self): - self.api.send_one_way_ndjson_request( - Operation.TABLE_DB, db_id=self.db_id, rc_op=TableDbOperation.RELEASE + if not self.done: + + # attempt to clean up server-side anyway + self.api.send_one_way_ndjson_request( + Operation.TABLE_DB, + db_id=self.db_id, + db_op=TableDbOperation.RELEASE + ) + + # complain + raise AssertionError("Should have released table db before dropping object") + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *excinfo): + if not self.done: + await self.release() + + async def release(self): + if self.done: + return + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.RELEASE ) + self.done = True + async def get_column_count(self) -> int: return raise_api_result( @@ -704,7 +782,7 @@ class _JsonTableDb(TableDb): ) ) - async def get_keys(self, col: int) -> list[bytes]: + async def get_keys(self, col: int = 0) -> list[bytes]: return list( map( lambda x: urlsafe_b64decode_no_pad(x), @@ -731,7 +809,7 @@ class _JsonTableDb(TableDb): ) return _JsonTableDbTransaction(self.api, tx_id) - async def store(self, col: int, key: bytes, value: bytes): + async def store(self, key: bytes, value: bytes, col: int = 0): return raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -744,7 +822,7 @@ class _JsonTableDb(TableDb): ) ) - async def load(self, col: int, key: bytes) -> Optional[bytes]: + async def load(self, key: bytes, col: int = 0) -> Optional[bytes]: res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -757,7 +835,7 @@ class _JsonTableDb(TableDb): ) return None if res is None else urlsafe_b64decode_no_pad(res) - async def delete(self, col: int, key: bytes) -> Optional[bytes]: + async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]: res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -782,17 +860,43 @@ def validate_cs_op(request: dict, response: dict): class _JsonCryptoSystem(CryptoSystem): api: _JsonVeilidAPI cs_id: int + done: bool def __init__(self, api: _JsonVeilidAPI, cs_id: int): self.api = api self.cs_id = cs_id + self.done = False def __del__(self): - self.api.send_one_way_ndjson_request( + if not self.done: + + # attempt to clean up server-side anyway + self.api.send_one_way_ndjson_request( + Operation.CRYPTO_SYSTEM, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.RELEASE + ) + + # complain + raise AssertionError("Should have released crypto system before dropping object") + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *excinfo): + if not self.done: + await self.release() + + async def release(self): + if self.done: + return + await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, cs_id=self.cs_id, - cs_op=CryptoSystemOperation.RELEASE, + cs_op=CryptoSystemOperation.RELEASE ) + self.done = True async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: return SharedSecret( diff --git a/veilid-python/veilid/schema/RecvMessage.json b/veilid-python/veilid/schema/RecvMessage.json index 15b37df1..073e0bf1 100644 --- a/veilid-python/veilid/schema/RecvMessage.json +++ b/veilid-python/veilid/schema/RecvMessage.json @@ -1319,6 +1319,30 @@ }, { "type": "object", + "anyOf": [ + { + "type": "object", + "required": [ + "value" + ], + "properties": { + "value": { + "type": "null" + } + } + }, + { + "type": "object", + "required": [ + "error" + ], + "properties": { + "error": { + "$ref": "#/definitions/VeilidAPIError" + } + } + } + ], "required": [ "tx_op" ], @@ -1333,6 +1357,30 @@ }, { "type": "object", + "anyOf": [ + { + "type": "object", + "required": [ + "value" + ], + "properties": { + "value": { + "type": "null" + } + } + }, + { + "type": "object", + "required": [ + "error" + ], + "properties": { + "error": { + "$ref": "#/definitions/VeilidAPIError" + } + } + } + ], "required": [ "tx_op" ], diff --git a/veilid-python/veilid/state.py b/veilid-python/veilid/state.py index 1b57ad27..7d464c70 100644 --- a/veilid-python/veilid/state.py +++ b/veilid-python/veilid/state.py @@ -3,7 +3,7 @@ from typing import Optional, Self from .config import VeilidConfig from .types import (ByteCount, RouteId, Timestamp, TimestampDuration, TypedKey, - ValueData, ValueSubkey, VeilidLogLevel, + ValueData, ValueSubkey, VeilidLogLevel, OperationId, urlsafe_b64decode_no_pad) @@ -309,9 +309,9 @@ class VeilidAppMessage: class VeilidAppCall: sender: Optional[TypedKey] message: bytes - call_id: str + call_id: OperationId - def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: str): + def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: OperationId): self.sender = sender self.message = message self.call_id = call_id @@ -322,7 +322,7 @@ class VeilidAppCall: return cls( None if j["sender"] is None else TypedKey(j["sender"]), urlsafe_b64decode_no_pad(j["message"]), - j["call_id"], + OperationId(j["call_id"]), ) diff --git a/veilid-wasm/src/lib.rs b/veilid-wasm/src/lib.rs index 72b2c102..5011fbe5 100644 --- a/veilid-wasm/src/lib.rs +++ b/veilid-wasm/src/lib.rs @@ -817,7 +817,7 @@ pub fn table_db_transaction_store(id: u32, col: u32, key: String, value: String) tdbt.clone() }; - tdbt.store(col, &key, &value); + tdbt.store(col, &key, &value)?; APIRESULT_UNDEFINED }) } @@ -836,7 +836,7 @@ pub fn table_db_transaction_delete(id: u32, col: u32, key: String) -> Promise { tdbt.clone() }; - tdbt.delete(col, &key); + tdbt.delete(col, &key)?; APIRESULT_UNDEFINED }) }