mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
Merge branch 'dht-testing' into 'main'
clean up object release ordering See merge request veilid/veilid!27
This commit is contained in:
commit
1f777a73b5
@ -44,7 +44,14 @@ impl Network {
|
|||||||
// Spawn a local async task for each socket
|
// Spawn a local async task for each socket
|
||||||
let mut protocol_handlers_unordered = FuturesUnordered::new();
|
let mut protocol_handlers_unordered = FuturesUnordered::new();
|
||||||
let network_manager = this.network_manager();
|
let network_manager = this.network_manager();
|
||||||
let stop_token = this.inner.lock().stop_source.as_ref().unwrap().token();
|
let stop_token = {
|
||||||
|
let inner = this.inner.lock();
|
||||||
|
if inner.stop_source.is_none() {
|
||||||
|
log_net!(debug "exiting UDP listener before it starts because we encountered an error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
inner.stop_source.as_ref().unwrap().token()
|
||||||
|
};
|
||||||
|
|
||||||
for ph in protocol_handlers {
|
for ph in protocol_handlers {
|
||||||
let network_manager = network_manager.clone();
|
let network_manager = network_manager.clone();
|
||||||
|
@ -1234,9 +1234,10 @@ impl RoutingTableInner {
|
|||||||
let kind = node_id.kind;
|
let kind = node_id.kind;
|
||||||
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
|
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|x| {
|
.filter_map(|nr| {
|
||||||
if x.node_ids().kinds().contains(&kind) {
|
let nr_locked = nr.locked(self);
|
||||||
Some(x.locked(self))
|
if nr_locked.node_ids().kinds().contains(&kind) {
|
||||||
|
Some(nr_locked)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -184,17 +184,22 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete record
|
// Delete record
|
||||||
rt_xact.delete(0, &k.bytes());
|
if let Err(e) = rt_xact.delete(0, &k.bytes()) {
|
||||||
|
log_stor!(error "record could not be deleted: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
// Delete subkeys
|
// Delete subkeys
|
||||||
let subkey_count = v.subkey_count() as u32;
|
let stored_subkeys = v.stored_subkeys();
|
||||||
for sk in 0..subkey_count {
|
for sk in stored_subkeys.iter() {
|
||||||
// From table
|
// From table
|
||||||
let stk = SubkeyTableKey {
|
let stk = SubkeyTableKey {
|
||||||
key: k.key,
|
key: k.key,
|
||||||
subkey: sk,
|
subkey: sk,
|
||||||
};
|
};
|
||||||
st_xact.delete(0, &stk.bytes());
|
let stkb = stk.bytes();
|
||||||
|
if let Err(e) = st_xact.delete(0, &stkb) {
|
||||||
|
log_stor!(error "subkey could not be deleted: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
// From cache
|
// From cache
|
||||||
self.remove_from_subkey_cache(stk);
|
self.remove_from_subkey_cache(stk);
|
||||||
@ -355,8 +360,8 @@ where
|
|||||||
want_descriptor: bool,
|
want_descriptor: bool,
|
||||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||||
// record from index
|
// record from index
|
||||||
let Some((subkey_count, opt_descriptor)) = self.with_record(key, |record| {
|
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
||||||
(record.subkey_count(), if want_descriptor {
|
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||||
Some(record.descriptor().clone())
|
Some(record.descriptor().clone())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@ -371,6 +376,15 @@ where
|
|||||||
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
|
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// See if we have this subkey stored
|
||||||
|
if !has_subkey {
|
||||||
|
// If not, return no value but maybe with descriptor
|
||||||
|
return Ok(Some(SubkeyResult {
|
||||||
|
value: None,
|
||||||
|
descriptor: opt_descriptor,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
// Get subkey table
|
// Get subkey table
|
||||||
let Some(subkey_table) = self.subkey_table.clone() else {
|
let Some(subkey_table) = self.subkey_table.clone() else {
|
||||||
apibail_internal!("record store not initialized");
|
apibail_internal!("record store not initialized");
|
||||||
@ -386,28 +400,23 @@ where
|
|||||||
descriptor: opt_descriptor,
|
descriptor: opt_descriptor,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
// If not in cache, try to pull from table store
|
// If not in cache, try to pull from table store if it is in our stored subkey set
|
||||||
if let Some(record_data) = subkey_table
|
let Some(record_data) = subkey_table
|
||||||
.load_rkyv::<RecordData>(0, &stk.bytes())
|
.load_rkyv::<RecordData>(0, &stk.bytes())
|
||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?
|
.map_err(VeilidAPIError::internal)? else {
|
||||||
{
|
apibail_internal!("failed to get subkey that was stored");
|
||||||
let out = record_data.signed_value_data().clone();
|
};
|
||||||
|
|
||||||
// Add to cache, do nothing with lru out
|
let out = record_data.signed_value_data().clone();
|
||||||
self.add_to_subkey_cache(stk, record_data);
|
|
||||||
|
|
||||||
return Ok(Some(SubkeyResult {
|
// Add to cache, do nothing with lru out
|
||||||
value: Some(out),
|
self.add_to_subkey_cache(stk, record_data);
|
||||||
descriptor: opt_descriptor,
|
|
||||||
}));
|
|
||||||
};
|
|
||||||
|
|
||||||
// Record was available, but subkey was not found, maybe descriptor gets returned
|
return Ok(Some(SubkeyResult {
|
||||||
Ok(Some(SubkeyResult {
|
value: Some(out),
|
||||||
value: None,
|
|
||||||
descriptor: opt_descriptor,
|
descriptor: opt_descriptor,
|
||||||
}))
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_subkey(
|
pub async fn set_subkey(
|
||||||
@ -492,6 +501,7 @@ where
|
|||||||
|
|
||||||
// Update record
|
// Update record
|
||||||
self.with_record_mut(key, |record| {
|
self.with_record_mut(key, |record| {
|
||||||
|
record.store_subkey(subkey);
|
||||||
record.set_record_data_size(new_record_data_size);
|
record.set_record_data_size(new_record_data_size);
|
||||||
})
|
})
|
||||||
.expect("record should still be here");
|
.expect("record should still be here");
|
||||||
@ -522,10 +532,11 @@ where
|
|||||||
out += "Record Index:\n";
|
out += "Record Index:\n";
|
||||||
for (rik, rec) in &self.record_index {
|
for (rik, rec) in &self.record_index {
|
||||||
out += &format!(
|
out += &format!(
|
||||||
" {} @ {} len={}\n",
|
" {} @ {} len={} subkeys={}\n",
|
||||||
rik.key.to_string(),
|
rik.key.to_string(),
|
||||||
rec.last_touched().as_u64(),
|
rec.last_touched().as_u64(),
|
||||||
rec.record_data_size()
|
rec.record_data_size(),
|
||||||
|
rec.stored_subkeys(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len());
|
out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len());
|
||||||
|
@ -12,6 +12,7 @@ where
|
|||||||
{
|
{
|
||||||
descriptor: SignedValueDescriptor,
|
descriptor: SignedValueDescriptor,
|
||||||
subkey_count: usize,
|
subkey_count: usize,
|
||||||
|
stored_subkeys: ValueSubkeyRangeSet,
|
||||||
last_touched_ts: Timestamp,
|
last_touched_ts: Timestamp,
|
||||||
record_data_size: usize,
|
record_data_size: usize,
|
||||||
detail: D,
|
detail: D,
|
||||||
@ -33,6 +34,7 @@ where
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
descriptor,
|
descriptor,
|
||||||
subkey_count,
|
subkey_count,
|
||||||
|
stored_subkeys: ValueSubkeyRangeSet::new(),
|
||||||
last_touched_ts: cur_ts,
|
last_touched_ts: cur_ts,
|
||||||
record_data_size: 0,
|
record_data_size: 0,
|
||||||
detail,
|
detail,
|
||||||
@ -50,6 +52,13 @@ where
|
|||||||
self.subkey_count
|
self.subkey_count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stored_subkeys(&self) -> &ValueSubkeyRangeSet {
|
||||||
|
&self.stored_subkeys
|
||||||
|
}
|
||||||
|
pub fn store_subkey(&mut self, subkey: ValueSubkey) {
|
||||||
|
self.stored_subkeys.insert(subkey);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn touch(&mut self, cur_ts: Timestamp) {
|
pub fn touch(&mut self, cur_ts: Timestamp) {
|
||||||
self.last_touched_ts = cur_ts
|
self.last_touched_ts = cur_ts
|
||||||
}
|
}
|
||||||
|
@ -45,6 +45,7 @@ impl Drop for TableDBUnlockedInner {
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct TableDB {
|
pub struct TableDB {
|
||||||
|
opened_column_count: u32,
|
||||||
unlocked_inner: Arc<TableDBUnlockedInner>,
|
unlocked_inner: Arc<TableDBUnlockedInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,11 +57,13 @@ impl TableDB {
|
|||||||
database: Database,
|
database: Database,
|
||||||
encryption_key: Option<TypedSharedSecret>,
|
encryption_key: Option<TypedSharedSecret>,
|
||||||
decryption_key: Option<TypedSharedSecret>,
|
decryption_key: Option<TypedSharedSecret>,
|
||||||
|
opened_column_count: u32,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek));
|
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek));
|
||||||
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk));
|
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
opened_column_count,
|
||||||
unlocked_inner: Arc::new(TableDBUnlockedInner {
|
unlocked_inner: Arc::new(TableDBUnlockedInner {
|
||||||
table,
|
table,
|
||||||
table_store,
|
table_store,
|
||||||
@ -71,8 +74,12 @@ impl TableDB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn try_new_from_weak_inner(weak_inner: Weak<TableDBUnlockedInner>) -> Option<Self> {
|
pub(super) fn try_new_from_weak_inner(
|
||||||
|
weak_inner: Weak<TableDBUnlockedInner>,
|
||||||
|
opened_column_count: u32,
|
||||||
|
) -> Option<Self> {
|
||||||
weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
|
weak_inner.upgrade().map(|table_db_unlocked_inner| Self {
|
||||||
|
opened_column_count,
|
||||||
unlocked_inner: table_db_unlocked_inner,
|
unlocked_inner: table_db_unlocked_inner,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -82,6 +89,7 @@ impl TableDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the total number of columns in the TableDB
|
/// Get the total number of columns in the TableDB
|
||||||
|
/// Not the number of columns that were opened, rather the total number that could be opened
|
||||||
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
|
pub fn get_column_count(&self) -> VeilidAPIResult<u32> {
|
||||||
let db = &self.unlocked_inner.database;
|
let db = &self.unlocked_inner.database;
|
||||||
db.num_columns().map_err(VeilidAPIError::from)
|
db.num_columns().map_err(VeilidAPIError::from)
|
||||||
@ -144,8 +152,14 @@ impl TableDB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the list of keys in a column of the TableDB
|
/// Get the list of keys in a column of the TableDAB
|
||||||
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
|
pub async fn get_keys(&self, col: u32) -> VeilidAPIResult<Vec<Vec<u8>>> {
|
||||||
|
if col >= self.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
let db = self.unlocked_inner.database.clone();
|
let db = self.unlocked_inner.database.clone();
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
db.iter_keys(col, None, |k| {
|
db.iter_keys(col, None, |k| {
|
||||||
@ -165,6 +179,12 @@ impl TableDB {
|
|||||||
|
|
||||||
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
|
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
|
||||||
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
|
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
|
||||||
|
if col >= self.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
let db = self.unlocked_inner.database.clone();
|
let db = self.unlocked_inner.database.clone();
|
||||||
let mut dbt = db.transaction();
|
let mut dbt = db.transaction();
|
||||||
dbt.put(
|
dbt.put(
|
||||||
@ -195,6 +215,12 @@ impl TableDB {
|
|||||||
|
|
||||||
/// Read a key from a column in the TableDB immediately.
|
/// Read a key from a column in the TableDB immediately.
|
||||||
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
|
pub async fn load(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
|
||||||
|
if col >= self.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
let db = self.unlocked_inner.database.clone();
|
let db = self.unlocked_inner.database.clone();
|
||||||
let key = self.maybe_encrypt(key, true);
|
let key = self.maybe_encrypt(key, true);
|
||||||
Ok(db
|
Ok(db
|
||||||
@ -233,6 +259,12 @@ impl TableDB {
|
|||||||
|
|
||||||
/// Delete key with from a column in the TableDB
|
/// Delete key with from a column in the TableDB
|
||||||
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
|
pub async fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<Option<Vec<u8>>> {
|
||||||
|
if col >= self.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
let key = self.maybe_encrypt(key, true);
|
let key = self.maybe_encrypt(key, true);
|
||||||
|
|
||||||
let db = self.unlocked_inner.database.clone();
|
let db = self.unlocked_inner.database.clone();
|
||||||
@ -330,11 +362,19 @@ impl TableDBTransaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Store a key with a value in a column in the TableDB
|
/// Store a key with a value in a column in the TableDB
|
||||||
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) {
|
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> VeilidAPIResult<()> {
|
||||||
|
if col >= self.db.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.db.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let key = self.db.maybe_encrypt(key, true);
|
let key = self.db.maybe_encrypt(key, true);
|
||||||
let value = self.db.maybe_encrypt(value, false);
|
let value = self.db.maybe_encrypt(value, false);
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
|
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store a key in rkyv format with a value in a column in the TableDB
|
/// Store a key in rkyv format with a value in a column in the TableDB
|
||||||
@ -343,12 +383,7 @@ impl TableDBTransaction {
|
|||||||
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||||
{
|
{
|
||||||
let value = to_rkyv(value)?;
|
let value = to_rkyv(value)?;
|
||||||
let key = self.db.maybe_encrypt(key, true);
|
self.store(col, key, &value)
|
||||||
let value = self.db.maybe_encrypt(&value, false);
|
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store a key in rkyv format with a value in a column in the TableDB
|
/// Store a key in rkyv format with a value in a column in the TableDB
|
||||||
@ -357,19 +392,22 @@ impl TableDBTransaction {
|
|||||||
T: serde::Serialize,
|
T: serde::Serialize,
|
||||||
{
|
{
|
||||||
let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
|
let value = serde_json::to_vec(value).map_err(VeilidAPIError::internal)?;
|
||||||
let key = self.db.maybe_encrypt(key, true);
|
self.store(col, key, &value)
|
||||||
let value = self.db.maybe_encrypt(&value, false);
|
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
|
||||||
inner.dbt.as_mut().unwrap().put_owned(col, key, value);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete key with from a column in the TableDB
|
/// Delete key with from a column in the TableDB
|
||||||
pub fn delete(&self, col: u32, key: &[u8]) {
|
pub fn delete(&self, col: u32, key: &[u8]) -> VeilidAPIResult<()> {
|
||||||
|
if col >= self.db.opened_column_count {
|
||||||
|
apibail_generic!(format!(
|
||||||
|
"Column exceeds opened column count {} >= {}",
|
||||||
|
col, self.db.opened_column_count
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let key = self.db.maybe_encrypt(key, true);
|
let key = self.db.maybe_encrypt(key, true);
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner.dbt.as_mut().unwrap().delete_owned(col, key);
|
inner.dbt.as_mut().unwrap().delete_owned(col, key);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,6 +428,7 @@ impl TableStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn on_table_db_drop(&self, table: String) {
|
pub(crate) fn on_table_db_drop(&self, table: String) {
|
||||||
|
log_rtab!(debug "dropping table db: {}", table);
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
if inner.opened.remove(&table).is_none() {
|
if inner.opened.remove(&table).is_none() {
|
||||||
unreachable!("should have removed an item");
|
unreachable!("should have removed an item");
|
||||||
@ -449,12 +450,21 @@ impl TableStore {
|
|||||||
|
|
||||||
let table_name = self.name_get_or_create(name).await?;
|
let table_name = self.name_get_or_create(name).await?;
|
||||||
|
|
||||||
// See if this table is already opened
|
// See if this table is already opened, if so the column count must be the same
|
||||||
{
|
{
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
|
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
|
||||||
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) {
|
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone(), column_count) {
|
||||||
Some(tdb) => {
|
Some(tdb) => {
|
||||||
|
// Ensure column count isnt bigger
|
||||||
|
let existing_col_count = tdb.get_column_count()?;
|
||||||
|
if column_count > existing_col_count {
|
||||||
|
return Err(VeilidAPIError::generic(format!(
|
||||||
|
"database must be closed before increasing column count {} -> {}",
|
||||||
|
existing_col_count, column_count,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
return Ok(tdb);
|
return Ok(tdb);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
@ -465,7 +475,7 @@ impl TableStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Open table db using platform-specific driver
|
// Open table db using platform-specific driver
|
||||||
let db = match self
|
let mut db = match self
|
||||||
.table_store_driver
|
.table_store_driver
|
||||||
.open(&table_name, column_count)
|
.open(&table_name, column_count)
|
||||||
.await
|
.await
|
||||||
@ -481,6 +491,24 @@ impl TableStore {
|
|||||||
// Flush table names to disk
|
// Flush table names to disk
|
||||||
self.flush().await;
|
self.flush().await;
|
||||||
|
|
||||||
|
// If more columns are available, open the low level db with the max column count but restrict the tabledb object to the number requested
|
||||||
|
let existing_col_count = db.num_columns().map_err(VeilidAPIError::from)?;
|
||||||
|
if existing_col_count > column_count {
|
||||||
|
drop(db);
|
||||||
|
db = match self
|
||||||
|
.table_store_driver
|
||||||
|
.open(&table_name, existing_col_count)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(db) => db,
|
||||||
|
Err(e) => {
|
||||||
|
self.name_delete(name).await.expect("cleanup failed");
|
||||||
|
self.flush().await;
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// Wrap low-level Database in TableDB object
|
// Wrap low-level Database in TableDB object
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
let table_db = TableDB::new(
|
let table_db = TableDB::new(
|
||||||
@ -490,6 +518,7 @@ impl TableStore {
|
|||||||
db,
|
db,
|
||||||
inner.encryption_key.clone(),
|
inner.encryption_key.clone(),
|
||||||
inner.encryption_key.clone(),
|
inner.encryption_key.clone(),
|
||||||
|
column_count,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Keep track of opened DBs
|
// Keep track of opened DBs
|
||||||
|
@ -132,6 +132,41 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
|||||||
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
|
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn test_transaction(ts: TableStore) {
|
||||||
|
trace!("test_transaction");
|
||||||
|
|
||||||
|
let _ = ts.delete("test");
|
||||||
|
let db = ts.open("test", 3).await.expect("should have opened");
|
||||||
|
assert!(
|
||||||
|
ts.delete("test").await.is_err(),
|
||||||
|
"should fail because file is opened"
|
||||||
|
);
|
||||||
|
|
||||||
|
let tx = db.transact();
|
||||||
|
assert!(tx.store(0, b"aaa", b"a-value").is_ok());
|
||||||
|
assert!(tx.store_json(1, b"bbb", &"b-value".to_owned()).is_ok());
|
||||||
|
assert!(tx.store_rkyv(2, b"ccc", &"c-value".to_owned()).is_ok());
|
||||||
|
assert!(tx.store(3, b"ddd", b"d-value").is_err());
|
||||||
|
assert!(tx.store(0, b"ddd", b"d-value").is_ok());
|
||||||
|
assert!(tx.delete(0, b"ddd").is_ok());
|
||||||
|
assert!(tx.commit().await.is_ok());
|
||||||
|
|
||||||
|
let tx = db.transact();
|
||||||
|
assert!(tx.delete(2, b"ccc").is_ok());
|
||||||
|
tx.rollback();
|
||||||
|
|
||||||
|
assert_eq!(db.load(0, b"aaa").await, Ok(Some(b"a-value".to_vec())));
|
||||||
|
assert_eq!(
|
||||||
|
db.load_json::<String>(1, b"bbb").await,
|
||||||
|
Ok(Some("b-value".to_owned()))
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
db.load_rkyv::<String>(2, b"ccc").await,
|
||||||
|
Ok(Some("c-value".to_owned()))
|
||||||
|
);
|
||||||
|
assert_eq!(db.load(0, b"ddd").await, Ok(None));
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_rkyv(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
pub async fn test_rkyv(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||||
trace!("test_rkyv");
|
trace!("test_rkyv");
|
||||||
|
|
||||||
@ -268,6 +303,7 @@ pub async fn test_all() {
|
|||||||
test_protect_unprotect(vcrypto.clone(), ts.clone()).await;
|
test_protect_unprotect(vcrypto.clone(), ts.clone()).await;
|
||||||
test_delete_open_delete(ts.clone()).await;
|
test_delete_open_delete(ts.clone()).await;
|
||||||
test_store_delete_load(ts.clone()).await;
|
test_store_delete_load(ts.clone()).await;
|
||||||
|
test_transaction(ts.clone()).await;
|
||||||
test_rkyv(vcrypto.clone(), ts.clone()).await;
|
test_rkyv(vcrypto.clone(), ts.clone()).await;
|
||||||
test_json(vcrypto, ts.clone()).await;
|
test_json(vcrypto, ts.clone()).await;
|
||||||
let _ = ts.delete("test").await;
|
let _ = ts.delete("test").await;
|
||||||
|
@ -405,12 +405,15 @@ impl JsonRequestProcessor {
|
|||||||
TableDbTransactionResponseOp::Rollback {}
|
TableDbTransactionResponseOp::Rollback {}
|
||||||
}
|
}
|
||||||
TableDbTransactionRequestOp::Store { col, key, value } => {
|
TableDbTransactionRequestOp::Store { col, key, value } => {
|
||||||
table_db_transaction.store(col, &key, &value);
|
TableDbTransactionResponseOp::Store {
|
||||||
TableDbTransactionResponseOp::Store {}
|
result: to_json_api_result(table_db_transaction.store(col, &key, &value)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TableDbTransactionRequestOp::Delete { col, key } => {
|
TableDbTransactionRequestOp::Delete { col, key } => {
|
||||||
table_db_transaction.delete(col, &key);
|
TableDbTransactionResponseOp::Delete {
|
||||||
TableDbTransactionResponseOp::Delete {}
|
result: to_json_api_result(table_db_transaction.delete(col, &key)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
TableDbTransactionResponse {
|
TableDbTransactionResponse {
|
||||||
|
@ -124,6 +124,12 @@ pub enum TableDbTransactionResponseOp {
|
|||||||
result: ApiResult<()>,
|
result: ApiResult<()>,
|
||||||
},
|
},
|
||||||
Rollback {},
|
Rollback {},
|
||||||
Store {},
|
Store {
|
||||||
Delete {},
|
#[serde(flatten)]
|
||||||
|
result: ApiResult<()>,
|
||||||
|
},
|
||||||
|
Delete {
|
||||||
|
#[serde(flatten)]
|
||||||
|
result: ApiResult<()>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ use range_set_blaze::*;
|
|||||||
|
|
||||||
#[derive(
|
#[derive(
|
||||||
Clone,
|
Clone,
|
||||||
Debug,
|
|
||||||
Default,
|
Default,
|
||||||
PartialOrd,
|
PartialOrd,
|
||||||
PartialEq,
|
PartialEq,
|
||||||
@ -55,3 +54,15 @@ impl DerefMut for ValueSubkeyRangeSet {
|
|||||||
&mut self.data
|
&mut self.data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for ValueSubkeyRangeSet {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{:?}", self.data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for ValueSubkeyRangeSet {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -884,7 +884,7 @@ pub extern "C" fn table_db_transaction_store(port: i64, id: u32, col: u32, key:
|
|||||||
tdbt.clone()
|
tdbt.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
tdbt.store(col, &key, &value);
|
tdbt.store(col, &key, &value)?;
|
||||||
APIRESULT_VOID
|
APIRESULT_VOID
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -908,7 +908,7 @@ pub extern "C" fn table_db_transaction_delete(port: i64, id: u32, col: u32, key:
|
|||||||
tdbt.clone()
|
tdbt.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
tdbt.delete(col, &key);
|
tdbt.delete(col, &key)?;
|
||||||
APIRESULT_VOID
|
APIRESULT_VOID
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -9,12 +9,12 @@ from .conftest import simple_update_callback
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_connect(api_connection):
|
async def test_connect(api_connection: veilid.VeilidAPI):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_node_id(api_connection):
|
async def test_get_node_id(api_connection: veilid.VeilidAPI):
|
||||||
state = await api_connection.get_state()
|
state = await api_connection.get_state()
|
||||||
node_ids = state.config.config.network.routing_table.node_id
|
node_ids = state.config.config.network.routing_table.node_id
|
||||||
|
|
||||||
@ -35,7 +35,7 @@ async def test_fail_connect():
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_version(api_connection):
|
async def test_version(api_connection: veilid.VeilidAPI):
|
||||||
v = await api_connection.veilid_version()
|
v = await api_connection.veilid_version()
|
||||||
print(f"veilid_version: {v.__dict__}")
|
print(f"veilid_version: {v.__dict__}")
|
||||||
assert v.__dict__.keys() >= {"_major", "_minor", "_patch"}
|
assert v.__dict__.keys() >= {"_major", "_minor", "_patch"}
|
||||||
|
@ -3,29 +3,26 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import veilid
|
import veilid
|
||||||
from veilid.api import CryptoSystem
|
from veilid.api import CryptoSystem
|
||||||
|
import gc
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_best_crypto_system(api_connection):
|
async def test_best_crypto_system(api_connection: veilid.VeilidAPI):
|
||||||
bcs: CryptoSystem = await api_connection.best_crypto_system()
|
cs: CryptoSystem = await api_connection.best_crypto_system()
|
||||||
|
async with cs:
|
||||||
assert await bcs.default_salt_length() == 16
|
assert await cs.default_salt_length() == 16
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_crypto_system(api_connection):
|
async def test_get_crypto_system(api_connection: veilid.VeilidAPI):
|
||||||
cs: CryptoSystem = await api_connection.get_crypto_system(
|
cs: CryptoSystem = await api_connection.get_crypto_system(
|
||||||
veilid.CryptoKind.CRYPTO_KIND_VLD0
|
veilid.CryptoKind.CRYPTO_KIND_VLD0
|
||||||
)
|
)
|
||||||
|
async with cs:
|
||||||
assert await cs.default_salt_length() == 16
|
assert await cs.default_salt_length() == 16
|
||||||
|
|
||||||
# clean up handle early
|
|
||||||
del cs
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_crypto_system_invalid(api_connection):
|
async def test_get_crypto_system_invalid(api_connection: veilid.VeilidAPI):
|
||||||
with pytest.raises(veilid.VeilidAPIErrorInvalidArgument) as exc:
|
with pytest.raises(veilid.VeilidAPIErrorInvalidArgument) as exc:
|
||||||
await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE)
|
await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE)
|
||||||
|
|
||||||
@ -35,15 +32,17 @@ async def test_get_crypto_system_invalid(api_connection):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_hash_and_verify_password(api_connection):
|
async def test_hash_and_verify_password(api_connection: veilid.VeilidAPI):
|
||||||
bcs = await api_connection.best_crypto_system()
|
cs = await api_connection.best_crypto_system()
|
||||||
nonce = await bcs.random_nonce()
|
async with cs:
|
||||||
salt = nonce.to_bytes()
|
nonce = await cs.random_nonce()
|
||||||
|
salt = nonce.to_bytes()
|
||||||
|
|
||||||
# Password match
|
# Password match
|
||||||
phash = await bcs.hash_password(b"abc123", salt)
|
phash = await cs.hash_password(b"abc123", salt)
|
||||||
assert await bcs.verify_password(b"abc123", phash)
|
assert await cs.verify_password(b"abc123", phash)
|
||||||
|
|
||||||
|
# Password mismatch
|
||||||
|
phash2 = await cs.hash_password(b"abc1234", salt)
|
||||||
|
assert not await cs.verify_password(b"abc12345", phash)
|
||||||
|
|
||||||
# Password mismatch
|
|
||||||
phash2 = await bcs.hash_password(b"abc1234", salt)
|
|
||||||
assert not await bcs.verify_password(b"abc12345", phash)
|
|
||||||
|
51
veilid-python/tests/test_dht.py
Normal file
51
veilid-python/tests/test_dht.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
# Routing context veilid tests
|
||||||
|
|
||||||
|
import veilid
|
||||||
|
import pytest
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from . import *
|
||||||
|
|
||||||
|
##################################################################
|
||||||
|
BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
|
||||||
|
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
|
||||||
|
# rc = await api_connection.new_routing_context()
|
||||||
|
# async with rc:
|
||||||
|
# with pytest.raises(veilid.VeilidAPIError):
|
||||||
|
# out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
|
||||||
|
|
||||||
|
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
|
||||||
|
# rc = await api_connection.new_routing_context()
|
||||||
|
# async with rc:
|
||||||
|
# with pytest.raises(veilid.VeilidAPIError):
|
||||||
|
# out = await rc.open_dht_record(BOGUS_KEY, None)
|
||||||
|
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
||||||
|
# rc = await api_connection.new_routing_context()
|
||||||
|
# async with rc:
|
||||||
|
# with pytest.raises(veilid.VeilidAPIError):
|
||||||
|
# await rc.close_dht_record(BOGUS_KEY)
|
||||||
|
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
||||||
|
# rc = await api_connection.new_routing_context()
|
||||||
|
# async with rc:
|
||||||
|
# with pytest.raises(veilid.VeilidAPIError):
|
||||||
|
# await rc.delete_dht_record(BOGUS_KEY)
|
||||||
|
|
||||||
|
# @pytest.mark.asyncio
|
||||||
|
# async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
|
||||||
|
# rc = await api_connection.new_routing_context()
|
||||||
|
# async with rc:
|
||||||
|
# rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
||||||
|
# await rc.close_dht_record(rec.key)
|
||||||
|
# await rc.delete_dht_record(rec.key)
|
||||||
|
|
||||||
|
# xxx make tests for tabledb api first
|
||||||
|
# xxx then make a test that creates a record, stores it in a table
|
||||||
|
# xxx then make another test that gets the keys from the table and closes/deletes them
|
@ -12,11 +12,15 @@ from .conftest import server_info
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_routing_contexts(api_connection):
|
async def test_routing_contexts(api_connection: veilid.VeilidAPI):
|
||||||
rc = await api_connection.new_routing_context()
|
rc = await api_connection.new_routing_context()
|
||||||
rcp = await rc.with_privacy()
|
async with rc:
|
||||||
rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
|
rcp = await rc.with_privacy(release = False)
|
||||||
await rcps.with_custom_privacy(veilid.Stability.RELIABLE)
|
async with rcp:
|
||||||
|
rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED, release = False)
|
||||||
|
async with rcps:
|
||||||
|
rcpscp = await rcps.with_custom_privacy(veilid.Stability.RELIABLE, release = False)
|
||||||
|
await rcpscp.release()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -38,24 +42,25 @@ async def test_routing_context_app_message_loopback():
|
|||||||
|
|
||||||
# make a routing context that uses a safety route
|
# make a routing context that uses a safety route
|
||||||
rc = await (await api.new_routing_context()).with_privacy()
|
rc = await (await api.new_routing_context()).with_privacy()
|
||||||
|
async with rc:
|
||||||
|
|
||||||
|
# make a new local private route
|
||||||
|
prl, blob = await api.new_private_route()
|
||||||
|
|
||||||
# make a new local private route
|
# import it as a remote route as well so we can send to it
|
||||||
prl, blob = await api.new_private_route()
|
prr = await api.import_remote_private_route(blob)
|
||||||
|
|
||||||
# import it as a remote route as well so we can send to it
|
# send an app message to our own private route
|
||||||
prr = await api.import_remote_private_route(blob)
|
message = b"abcd1234"
|
||||||
|
await rc.app_message(prr, message)
|
||||||
|
|
||||||
# send an app message to our own private route
|
# we should get the same message back
|
||||||
message = b"abcd1234"
|
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
||||||
await rc.app_message(prr, message)
|
app_message_queue.get(), timeout=10
|
||||||
|
)
|
||||||
|
|
||||||
# we should get the same message back
|
assert isinstance(update.detail, veilid.VeilidAppMessage)
|
||||||
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
assert update.detail.message == message
|
||||||
app_message_queue.get(), timeout=10
|
|
||||||
)
|
|
||||||
|
|
||||||
assert isinstance(update.detail, veilid.VeilidAppMessage)
|
|
||||||
assert update.detail.message == message
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -74,37 +79,33 @@ async def test_routing_context_app_call_loopback():
|
|||||||
|
|
||||||
# make a routing context that uses a safety route
|
# make a routing context that uses a safety route
|
||||||
rc = await (await api.new_routing_context()).with_privacy()
|
rc = await (await api.new_routing_context()).with_privacy()
|
||||||
|
async with rc:
|
||||||
|
|
||||||
|
# make a new local private route
|
||||||
|
prl, blob = await api.new_private_route()
|
||||||
|
|
||||||
# make a new local private route
|
# import it as a remote route as well so we can send to it
|
||||||
prl, blob = await api.new_private_route()
|
prr = await api.import_remote_private_route(blob)
|
||||||
|
|
||||||
# import it as a remote route as well so we can send to it
|
# send an app message to our own private route
|
||||||
prr = await api.import_remote_private_route(blob)
|
request = b"abcd1234"
|
||||||
|
app_call_task = asyncio.create_task(
|
||||||
|
rc.app_call(prr, request), name="app call task"
|
||||||
|
)
|
||||||
|
|
||||||
# send an app message to our own private route
|
# we should get the same request back
|
||||||
request = b"abcd1234"
|
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
||||||
app_call_task = asyncio.create_task(
|
app_call_queue.get(), timeout=10
|
||||||
rc.app_call(prr, request), name="app call task"
|
)
|
||||||
)
|
appcall = update.detail
|
||||||
|
|
||||||
# we should get the same request back
|
assert isinstance(appcall, veilid.VeilidAppCall)
|
||||||
update: veilid.VeilidUpdate = await asyncio.wait_for(
|
assert appcall.message == request
|
||||||
app_call_queue.get(), timeout=10
|
|
||||||
)
|
|
||||||
appcall = update.detail
|
|
||||||
|
|
||||||
assert isinstance(appcall, veilid.VeilidAppCall)
|
# now we reply to the request
|
||||||
assert appcall.message == request
|
reply = b"qwer5678"
|
||||||
|
await api.app_call_reply(appcall.call_id, reply)
|
||||||
|
|
||||||
# now we reply to the request
|
# now we should get the reply from the call
|
||||||
reply = b"qwer5678"
|
result = await app_call_task
|
||||||
# TK: OperationId use to be a subclass of `int`. When I wrapped `appcall.call_id` in int(),
|
assert result == reply
|
||||||
# this failed JSON schema validation, which defines `call_id` as a string. Maybe that was a
|
|
||||||
# typo, and OperationId is really *supposed* to be a str? Alternatively, perhaps the
|
|
||||||
# signature of `app_call_reply` is wrong and it's supposed to take a type other than
|
|
||||||
# OperationId?
|
|
||||||
await api.app_call_reply(OperationId(appcall.call_id), reply)
|
|
||||||
|
|
||||||
# now we should get the reply from the call
|
|
||||||
result = await app_call_task
|
|
||||||
assert result == reply
|
|
||||||
|
127
veilid-python/tests/test_table_db.py
Normal file
127
veilid-python/tests/test_table_db.py
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
# TableDB veilid tests
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import veilid
|
||||||
|
from veilid.api import CryptoSystem
|
||||||
|
|
||||||
|
|
||||||
|
TEST_DB = "__pytest_db"
|
||||||
|
TEST_NONEXISTENT_DB = "__pytest_nonexistent_db"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_delete_table_db_nonexistent(api_connection: veilid.VeilidAPI):
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_NONEXISTENT_DB)
|
||||||
|
assert not deleted
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_delete_table_db(api_connection: veilid.VeilidAPI):
|
||||||
|
# delete test db if it exists
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb:
|
||||||
|
# delete should fail since it is still open
|
||||||
|
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
# drop the db
|
||||||
|
|
||||||
|
# now delete should succeed
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_DB)
|
||||||
|
assert deleted
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_twice_table_db(api_connection: veilid.VeilidAPI):
|
||||||
|
# delete test db if it exists
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
tdb2 = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
|
||||||
|
# delete should fail because open
|
||||||
|
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
await tdb.release()
|
||||||
|
|
||||||
|
# delete should fail because open
|
||||||
|
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
await tdb2.release()
|
||||||
|
|
||||||
|
# delete should now succeed
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_DB)
|
||||||
|
assert deleted
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_twice_table_db_store_load(api_connection: veilid.VeilidAPI):
|
||||||
|
# delete test db if it exists
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb:
|
||||||
|
tdb2 = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb2:
|
||||||
|
# store into first db copy
|
||||||
|
await tdb.store(b"asdf", b"1234")
|
||||||
|
# load from second db copy
|
||||||
|
assert await tdb.load(b"asdf") == b"1234"
|
||||||
|
|
||||||
|
# delete should now succeed
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_DB)
|
||||||
|
assert deleted
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_twice_table_db_store_delete_load(api_connection: veilid.VeilidAPI):
|
||||||
|
# delete test db if it exists
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb:
|
||||||
|
tdb2 = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb2:
|
||||||
|
|
||||||
|
# store into first db copy
|
||||||
|
await tdb.store(b"asdf", b"1234")
|
||||||
|
# delete from second db copy and clean up
|
||||||
|
await tdb2.delete(b"asdf")
|
||||||
|
|
||||||
|
# load from first db copy
|
||||||
|
assert await tdb.load(b"asdf") == None
|
||||||
|
|
||||||
|
# delete should now succeed
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_DB)
|
||||||
|
assert deleted
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_resize_table_db(api_connection: veilid.VeilidAPI):
|
||||||
|
# delete test db if it exists
|
||||||
|
await api_connection.delete_table_db(TEST_DB)
|
||||||
|
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb:
|
||||||
|
# reopen the db with more columns should fail if it is already open
|
||||||
|
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
|
||||||
|
await api_connection.open_table_db(TEST_DB, 2)
|
||||||
|
|
||||||
|
tdb2 = await api_connection.open_table_db(TEST_DB, 2)
|
||||||
|
async with tdb2:
|
||||||
|
# write something to second column
|
||||||
|
await tdb2.store(b"qwer", b"5678", col = 1)
|
||||||
|
|
||||||
|
# reopen the db with fewer columns
|
||||||
|
tdb = await api_connection.open_table_db(TEST_DB, 1)
|
||||||
|
async with tdb:
|
||||||
|
|
||||||
|
# Should fail access to second column
|
||||||
|
with pytest.raises(veilid.VeilidAPIErrorGeneric) as exc:
|
||||||
|
await tdb.load(b"qwer", col = 1)
|
||||||
|
|
||||||
|
# Should succeed with access to second column
|
||||||
|
assert await tdb2.load(b"qwer", col = 1) == b"5678"
|
||||||
|
|
||||||
|
# now delete should succeed
|
||||||
|
deleted = await api_connection.delete_table_db(TEST_DB)
|
||||||
|
assert deleted
|
||||||
|
|
@ -7,15 +7,19 @@ from .state import VeilidState
|
|||||||
|
|
||||||
class RoutingContext(ABC):
|
class RoutingContext(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def with_privacy(self) -> Self:
|
async def release(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def with_custom_privacy(self, stability: types.Stability) -> Self:
|
async def with_privacy(self, release = True) -> Self:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def with_sequencing(self, sequencing: types.Sequencing) -> Self:
|
async def with_custom_privacy(self, stability: types.Stability, release = True) -> Self:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def with_sequencing(self, sequencing: types.Sequencing, release = True) -> Self:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
@ -89,21 +93,25 @@ class TableDbTransaction(ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def store(self, col: int, key: bytes, value: bytes):
|
async def store(self, key: bytes, value: bytes, col: int = 0):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def delete(self, col: int, key: bytes):
|
async def delete(self, key: bytes, col: int = 0):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TableDb(ABC):
|
class TableDb(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
async def release(self):
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def get_column_count(self) -> int:
|
async def get_column_count(self) -> int:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def get_keys(self, col: int) -> list[bytes]:
|
async def get_keys(self, col: int = 0) -> list[bytes]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
@ -111,19 +119,23 @@ class TableDb(ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def store(self, col: int, key: bytes, value: bytes):
|
async def store(self, key: bytes, value: bytes, col: int = 0):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def load(self, col: int, key: bytes) -> Optional[bytes]:
|
async def load(self, key: bytes, col: int = 0) -> Optional[bytes]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def delete(self, col: int, key: bytes) -> Optional[bytes]:
|
async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CryptoSystem(ABC):
|
class CryptoSystem(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
async def release(self):
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def cached_dh(
|
async def cached_dh(
|
||||||
self, key: types.PublicKey, secret: types.SecretKey
|
self, key: types.PublicKey, secret: types.SecretKey
|
||||||
@ -284,7 +296,7 @@ class VeilidAPI(ABC):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def delete_table_db(self, name: str):
|
async def delete_table_db(self, name: str) -> bool:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
@ -320,7 +320,7 @@ class _JsonVeilidAPI(VeilidAPI):
|
|||||||
)
|
)
|
||||||
return _JsonTableDb(self, db_id)
|
return _JsonTableDb(self, db_id)
|
||||||
|
|
||||||
async def delete_table_db(self, name: str):
|
async def delete_table_db(self, name: str) -> bool:
|
||||||
return raise_api_result(
|
return raise_api_result(
|
||||||
await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name)
|
await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name)
|
||||||
)
|
)
|
||||||
@ -411,19 +411,44 @@ def validate_rc_op(request: dict, response: dict):
|
|||||||
class _JsonRoutingContext(RoutingContext):
|
class _JsonRoutingContext(RoutingContext):
|
||||||
api: _JsonVeilidAPI
|
api: _JsonVeilidAPI
|
||||||
rc_id: int
|
rc_id: int
|
||||||
|
done: bool
|
||||||
|
|
||||||
def __init__(self, api: _JsonVeilidAPI, rc_id: int):
|
def __init__(self, api: _JsonVeilidAPI, rc_id: int):
|
||||||
self.api = api
|
self.api = api
|
||||||
self.rc_id = rc_id
|
self.rc_id = rc_id
|
||||||
|
self.done = False
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.api.send_one_way_ndjson_request(
|
if not self.done:
|
||||||
Operation.ROUTING_CONTEXT,
|
# attempt to clean up server-side anyway
|
||||||
rc_id=self.rc_id,
|
self.api.send_one_way_ndjson_request(
|
||||||
rc_op=RoutingContextOperation.RELEASE,
|
Operation.ROUTING_CONTEXT,
|
||||||
)
|
rc_id=self.rc_id,
|
||||||
|
rc_op=RoutingContextOperation.RELEASE
|
||||||
|
)
|
||||||
|
|
||||||
|
# complain
|
||||||
|
raise AssertionError("Should have released routing context before dropping object")
|
||||||
|
|
||||||
async def with_privacy(self) -> Self:
|
async def __aenter__(self) -> Self:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *excinfo):
|
||||||
|
if not self.done:
|
||||||
|
await self.release()
|
||||||
|
|
||||||
|
async def release(self):
|
||||||
|
if self.done:
|
||||||
|
return
|
||||||
|
await self.api.send_ndjson_request(
|
||||||
|
Operation.ROUTING_CONTEXT,
|
||||||
|
validate=validate_rc_op,
|
||||||
|
rc_id=self.rc_id,
|
||||||
|
rc_op=RoutingContextOperation.RELEASE
|
||||||
|
)
|
||||||
|
self.done = True
|
||||||
|
|
||||||
|
async def with_privacy(self, release = True) -> Self:
|
||||||
new_rc_id = raise_api_result(
|
new_rc_id = raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.ROUTING_CONTEXT,
|
Operation.ROUTING_CONTEXT,
|
||||||
@ -432,9 +457,11 @@ class _JsonRoutingContext(RoutingContext):
|
|||||||
rc_op=RoutingContextOperation.WITH_PRIVACY,
|
rc_op=RoutingContextOperation.WITH_PRIVACY,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if release:
|
||||||
|
await self.release()
|
||||||
return self.__class__(self.api, new_rc_id)
|
return self.__class__(self.api, new_rc_id)
|
||||||
|
|
||||||
async def with_custom_privacy(self, stability: Stability) -> Self:
|
async def with_custom_privacy(self, stability: Stability, release = True) -> Self:
|
||||||
new_rc_id = raise_api_result(
|
new_rc_id = raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.ROUTING_CONTEXT,
|
Operation.ROUTING_CONTEXT,
|
||||||
@ -444,9 +471,11 @@ class _JsonRoutingContext(RoutingContext):
|
|||||||
stability=stability,
|
stability=stability,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if release:
|
||||||
|
await self.release()
|
||||||
return self.__class__(self.api, new_rc_id)
|
return self.__class__(self.api, new_rc_id)
|
||||||
|
|
||||||
async def with_sequencing(self, sequencing: Sequencing) -> Self:
|
async def with_sequencing(self, sequencing: Sequencing, release = True) -> Self:
|
||||||
new_rc_id = raise_api_result(
|
new_rc_id = raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.ROUTING_CONTEXT,
|
Operation.ROUTING_CONTEXT,
|
||||||
@ -456,6 +485,8 @@ class _JsonRoutingContext(RoutingContext):
|
|||||||
sequencing=sequencing,
|
sequencing=sequencing,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if release:
|
||||||
|
await self.release()
|
||||||
return self.__class__(self.api, new_rc_id)
|
return self.__class__(self.api, new_rc_id)
|
||||||
|
|
||||||
async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes:
|
async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes:
|
||||||
@ -627,9 +658,27 @@ class _JsonTableDbTransaction(TableDbTransaction):
|
|||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if not self.done:
|
if not self.done:
|
||||||
raise AssertionError("Should have committed or rolled back transaction")
|
# attempt to clean up server-side anyway
|
||||||
|
self.api.send_one_way_ndjson_request(
|
||||||
|
Operation.TABLE_DB_TRANSACTION,
|
||||||
|
tx_id=self.tx_id,
|
||||||
|
tx_op=TableDbTransactionOperation.ROLLBACK,
|
||||||
|
)
|
||||||
|
|
||||||
|
# complain
|
||||||
|
raise AssertionError("Should have committed or rolled back transaction before dropping object")
|
||||||
|
|
||||||
|
async def __aenter__(self) -> Self:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *excinfo):
|
||||||
|
if not self.done:
|
||||||
|
await self.rollback()
|
||||||
|
|
||||||
async def commit(self):
|
async def commit(self):
|
||||||
|
if self.done:
|
||||||
|
raise AssertionError("Transaction is already done")
|
||||||
|
|
||||||
raise_api_result(
|
raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB_TRANSACTION,
|
Operation.TABLE_DB_TRANSACTION,
|
||||||
@ -641,17 +690,17 @@ class _JsonTableDbTransaction(TableDbTransaction):
|
|||||||
self.done = True
|
self.done = True
|
||||||
|
|
||||||
async def rollback(self):
|
async def rollback(self):
|
||||||
raise_api_result(
|
if self.done:
|
||||||
await self.api.send_ndjson_request(
|
raise AssertionError("Transaction is already done")
|
||||||
Operation.TABLE_DB_TRANSACTION,
|
await self.api.send_ndjson_request(
|
||||||
validate=validate_tx_op,
|
Operation.TABLE_DB_TRANSACTION,
|
||||||
tx_id=self.tx_id,
|
validate=validate_tx_op,
|
||||||
tx_op=TableDbTransactionOperation.ROLLBACK,
|
tx_id=self.tx_id,
|
||||||
)
|
tx_op=TableDbTransactionOperation.ROLLBACK,
|
||||||
)
|
)
|
||||||
self.done = True
|
self.done = True
|
||||||
|
|
||||||
async def store(self, col: int, key: bytes, value: bytes):
|
async def store(self, key: bytes, value: bytes, col: int = 0):
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB_TRANSACTION,
|
Operation.TABLE_DB_TRANSACTION,
|
||||||
validate=validate_tx_op,
|
validate=validate_tx_op,
|
||||||
@ -662,7 +711,7 @@ class _JsonTableDbTransaction(TableDbTransaction):
|
|||||||
value=value,
|
value=value,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def delete(self, col: int, key: bytes):
|
async def delete(self, key: bytes, col: int = 0):
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB_TRANSACTION,
|
Operation.TABLE_DB_TRANSACTION,
|
||||||
validate=validate_tx_op,
|
validate=validate_tx_op,
|
||||||
@ -684,15 +733,44 @@ def validate_db_op(request: dict, response: dict):
|
|||||||
class _JsonTableDb(TableDb):
|
class _JsonTableDb(TableDb):
|
||||||
api: _JsonVeilidAPI
|
api: _JsonVeilidAPI
|
||||||
db_id: int
|
db_id: int
|
||||||
|
done: bool
|
||||||
|
|
||||||
def __init__(self, api: _JsonVeilidAPI, db_id: int):
|
def __init__(self, api: _JsonVeilidAPI, db_id: int):
|
||||||
self.api = api
|
self.api = api
|
||||||
self.db_id = db_id
|
self.db_id = db_id
|
||||||
|
self.done = False
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.api.send_one_way_ndjson_request(
|
if not self.done:
|
||||||
Operation.TABLE_DB, db_id=self.db_id, rc_op=TableDbOperation.RELEASE
|
|
||||||
|
# attempt to clean up server-side anyway
|
||||||
|
self.api.send_one_way_ndjson_request(
|
||||||
|
Operation.TABLE_DB,
|
||||||
|
db_id=self.db_id,
|
||||||
|
db_op=TableDbOperation.RELEASE
|
||||||
|
)
|
||||||
|
|
||||||
|
# complain
|
||||||
|
raise AssertionError("Should have released table db before dropping object")
|
||||||
|
|
||||||
|
async def __aenter__(self) -> Self:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *excinfo):
|
||||||
|
if not self.done:
|
||||||
|
await self.release()
|
||||||
|
|
||||||
|
async def release(self):
|
||||||
|
if self.done:
|
||||||
|
return
|
||||||
|
await self.api.send_ndjson_request(
|
||||||
|
Operation.TABLE_DB,
|
||||||
|
validate=validate_db_op,
|
||||||
|
db_id=self.db_id,
|
||||||
|
db_op=TableDbOperation.RELEASE
|
||||||
)
|
)
|
||||||
|
self.done = True
|
||||||
|
|
||||||
|
|
||||||
async def get_column_count(self) -> int:
|
async def get_column_count(self) -> int:
|
||||||
return raise_api_result(
|
return raise_api_result(
|
||||||
@ -704,7 +782,7 @@ class _JsonTableDb(TableDb):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_keys(self, col: int) -> list[bytes]:
|
async def get_keys(self, col: int = 0) -> list[bytes]:
|
||||||
return list(
|
return list(
|
||||||
map(
|
map(
|
||||||
lambda x: urlsafe_b64decode_no_pad(x),
|
lambda x: urlsafe_b64decode_no_pad(x),
|
||||||
@ -731,7 +809,7 @@ class _JsonTableDb(TableDb):
|
|||||||
)
|
)
|
||||||
return _JsonTableDbTransaction(self.api, tx_id)
|
return _JsonTableDbTransaction(self.api, tx_id)
|
||||||
|
|
||||||
async def store(self, col: int, key: bytes, value: bytes):
|
async def store(self, key: bytes, value: bytes, col: int = 0):
|
||||||
return raise_api_result(
|
return raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB,
|
Operation.TABLE_DB,
|
||||||
@ -744,7 +822,7 @@ class _JsonTableDb(TableDb):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def load(self, col: int, key: bytes) -> Optional[bytes]:
|
async def load(self, key: bytes, col: int = 0) -> Optional[bytes]:
|
||||||
res = raise_api_result(
|
res = raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB,
|
Operation.TABLE_DB,
|
||||||
@ -757,7 +835,7 @@ class _JsonTableDb(TableDb):
|
|||||||
)
|
)
|
||||||
return None if res is None else urlsafe_b64decode_no_pad(res)
|
return None if res is None else urlsafe_b64decode_no_pad(res)
|
||||||
|
|
||||||
async def delete(self, col: int, key: bytes) -> Optional[bytes]:
|
async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]:
|
||||||
res = raise_api_result(
|
res = raise_api_result(
|
||||||
await self.api.send_ndjson_request(
|
await self.api.send_ndjson_request(
|
||||||
Operation.TABLE_DB,
|
Operation.TABLE_DB,
|
||||||
@ -782,17 +860,43 @@ def validate_cs_op(request: dict, response: dict):
|
|||||||
class _JsonCryptoSystem(CryptoSystem):
|
class _JsonCryptoSystem(CryptoSystem):
|
||||||
api: _JsonVeilidAPI
|
api: _JsonVeilidAPI
|
||||||
cs_id: int
|
cs_id: int
|
||||||
|
done: bool
|
||||||
|
|
||||||
def __init__(self, api: _JsonVeilidAPI, cs_id: int):
|
def __init__(self, api: _JsonVeilidAPI, cs_id: int):
|
||||||
self.api = api
|
self.api = api
|
||||||
self.cs_id = cs_id
|
self.cs_id = cs_id
|
||||||
|
self.done = False
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
self.api.send_one_way_ndjson_request(
|
if not self.done:
|
||||||
|
|
||||||
|
# attempt to clean up server-side anyway
|
||||||
|
self.api.send_one_way_ndjson_request(
|
||||||
|
Operation.CRYPTO_SYSTEM,
|
||||||
|
cs_id=self.cs_id,
|
||||||
|
cs_op=CryptoSystemOperation.RELEASE
|
||||||
|
)
|
||||||
|
|
||||||
|
# complain
|
||||||
|
raise AssertionError("Should have released crypto system before dropping object")
|
||||||
|
|
||||||
|
async def __aenter__(self) -> Self:
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, *excinfo):
|
||||||
|
if not self.done:
|
||||||
|
await self.release()
|
||||||
|
|
||||||
|
async def release(self):
|
||||||
|
if self.done:
|
||||||
|
return
|
||||||
|
await self.api.send_ndjson_request(
|
||||||
Operation.CRYPTO_SYSTEM,
|
Operation.CRYPTO_SYSTEM,
|
||||||
|
validate=validate_cs_op,
|
||||||
cs_id=self.cs_id,
|
cs_id=self.cs_id,
|
||||||
cs_op=CryptoSystemOperation.RELEASE,
|
cs_op=CryptoSystemOperation.RELEASE
|
||||||
)
|
)
|
||||||
|
self.done = True
|
||||||
|
|
||||||
async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
|
async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
|
||||||
return SharedSecret(
|
return SharedSecret(
|
||||||
|
@ -1319,6 +1319,30 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
|
"anyOf": [
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"value"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"value": {
|
||||||
|
"type": "null"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"error"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"error": {
|
||||||
|
"$ref": "#/definitions/VeilidAPIError"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
"required": [
|
"required": [
|
||||||
"tx_op"
|
"tx_op"
|
||||||
],
|
],
|
||||||
@ -1333,6 +1357,30 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"type": "object",
|
"type": "object",
|
||||||
|
"anyOf": [
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"value"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"value": {
|
||||||
|
"type": "null"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"error"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"error": {
|
||||||
|
"$ref": "#/definitions/VeilidAPIError"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
"required": [
|
"required": [
|
||||||
"tx_op"
|
"tx_op"
|
||||||
],
|
],
|
||||||
|
@ -3,7 +3,7 @@ from typing import Optional, Self
|
|||||||
|
|
||||||
from .config import VeilidConfig
|
from .config import VeilidConfig
|
||||||
from .types import (ByteCount, RouteId, Timestamp, TimestampDuration, TypedKey,
|
from .types import (ByteCount, RouteId, Timestamp, TimestampDuration, TypedKey,
|
||||||
ValueData, ValueSubkey, VeilidLogLevel,
|
ValueData, ValueSubkey, VeilidLogLevel, OperationId,
|
||||||
urlsafe_b64decode_no_pad)
|
urlsafe_b64decode_no_pad)
|
||||||
|
|
||||||
|
|
||||||
@ -309,9 +309,9 @@ class VeilidAppMessage:
|
|||||||
class VeilidAppCall:
|
class VeilidAppCall:
|
||||||
sender: Optional[TypedKey]
|
sender: Optional[TypedKey]
|
||||||
message: bytes
|
message: bytes
|
||||||
call_id: str
|
call_id: OperationId
|
||||||
|
|
||||||
def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: str):
|
def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: OperationId):
|
||||||
self.sender = sender
|
self.sender = sender
|
||||||
self.message = message
|
self.message = message
|
||||||
self.call_id = call_id
|
self.call_id = call_id
|
||||||
@ -322,7 +322,7 @@ class VeilidAppCall:
|
|||||||
return cls(
|
return cls(
|
||||||
None if j["sender"] is None else TypedKey(j["sender"]),
|
None if j["sender"] is None else TypedKey(j["sender"]),
|
||||||
urlsafe_b64decode_no_pad(j["message"]),
|
urlsafe_b64decode_no_pad(j["message"]),
|
||||||
j["call_id"],
|
OperationId(j["call_id"]),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -817,7 +817,7 @@ pub fn table_db_transaction_store(id: u32, col: u32, key: String, value: String)
|
|||||||
tdbt.clone()
|
tdbt.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
tdbt.store(col, &key, &value);
|
tdbt.store(col, &key, &value)?;
|
||||||
APIRESULT_UNDEFINED
|
APIRESULT_UNDEFINED
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -836,7 +836,7 @@ pub fn table_db_transaction_delete(id: u32, col: u32, key: String) -> Promise {
|
|||||||
tdbt.clone()
|
tdbt.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
tdbt.delete(col, &key);
|
tdbt.delete(col, &key)?;
|
||||||
APIRESULT_UNDEFINED
|
APIRESULT_UNDEFINED
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user