wasm support for inspect and watchvalue

(needs tests)
This commit is contained in:
Christien Rioux 2024-03-14 00:08:08 -04:00
parent cfce0a35b4
commit ff28273a59
9 changed files with 217 additions and 54 deletions

View File

@ -155,7 +155,8 @@ impl StorageManager {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
if answer.seqs.len() != descriptor_info.subkeys.len() {
#[allow(clippy::unnecessary_cast)]
if answer.seqs.len() != descriptor_info.subkeys.len() as usize {
// Not the right number of sequence numbers
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(

View File

@ -766,10 +766,13 @@ impl StorageManager {
.handle_inspect_local_value(key, subkeys.clone(), true)
.await?;
assert!(
local_inspect_result.subkeys.len() == local_inspect_result.seqs.len(),
"mismatch between local subkeys returned and sequence number list returned"
);
#[allow(clippy::unnecessary_cast)]
{
assert!(
local_inspect_result.subkeys.len() as usize == local_inspect_result.seqs.len(),
"mismatch between local subkeys returned and sequence number list returned"
);
}
assert!(
local_inspect_result.subkeys.is_subset(&subkeys),
"more subkeys returned locally than requested"
@ -816,11 +819,14 @@ impl StorageManager {
.await?;
// Sanity check before zip
assert_eq!(
result.inspect_result.subkeys.len(),
result.fanout_results.len(),
"mismatch between subkeys returned and fanout results returned"
);
#[allow(clippy::unnecessary_cast)]
{
assert_eq!(
result.inspect_result.subkeys.len() as usize,
result.fanout_results.len(),
"mismatch between subkeys returned and fanout results returned"
);
}
if !local_inspect_result.subkeys.is_empty() && !result.inspect_result.subkeys.is_empty() {
assert_eq!(
result.inspect_result.subkeys.len(),

View File

@ -832,7 +832,8 @@ where
}
// Build sequence number list to return
let mut seqs = Vec::with_capacity(subkeys.len());
#[allow(clippy::unnecessary_cast)]
let mut seqs = Vec::with_capacity(subkeys.len() as usize);
for subkey in subkeys.iter() {
let stk = SubkeyTableKey { key, subkey };
let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {

View File

@ -58,7 +58,11 @@ impl fmt::Debug for DHTRecordReport {
#[derive(
Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi, namespace))]
#[cfg_attr(
target_arch = "wasm32",
derive(Tsify),
tsify(from_wasm_abi, into_wasm_abi, namespace)
)]
pub enum DHTReportScope {
/// Return only the local copy sequence numbers
/// Useful for seeing what subkeys you have locally and which ones have not been retrieved

View File

@ -5,6 +5,11 @@ use range_set_blaze::*;
#[derive(
Clone, Default, Hash, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema,
)]
#[cfg_attr(
target_arch = "wasm32",
derive(Tsify),
tsify(from_wasm_abi, into_wasm_abi)
)]
#[serde(transparent)]
pub struct ValueSubkeyRangeSet {
#[serde(with = "serialize_range_set_blaze")]
@ -80,6 +85,45 @@ impl ValueSubkeyRangeSet {
}
}
// impl TryFrom<Box<[Box<[ValueSubkey]>]>> for ValueSubkeyRangeSet {
// type Error = VeilidAPIError;
// fn try_from(value: Box<[Box<[ValueSubkey]>]>) -> Result<Self, Self::Error> {
// let mut data = RangeSetBlaze::<ValueSubkey>::new();
// let mut last = None;
// for r in value.iter() {
// if r.len() != 2 {
// apibail_generic!("not a pair");
// }
// let start = r[0];
// let end = r[1];
// if let Some(last) = last {
// if start >= last {
// apibail_generic!("pair out of order");
// }
// }
// if start > end {
// apibail_generic!("invalid pair");
// }
// last = Some(end);
// data.ranges_insert(start..=end);
// }
// Ok(Self::new_with_data(data))
// }
// }
// impl From<ValueSubkeyRangeSet> for Box<[Box<[ValueSubkey]>]> {
// fn from(value: ValueSubkeyRangeSet) -> Self {
// value
// .ranges()
// .map(|r| Box::new([*r.start(), *r.end()]) as Box<[ValueSubkey]>)
// .collect()
// }
// }
impl FromStr for ValueSubkeyRangeSet {
type Err = VeilidAPIError;

View File

@ -716,6 +716,37 @@ pub fn routing_context_cancel_dht_watch(id: u32, key: String, subkeys: String) -
})
}
#[wasm_bindgen()]
pub fn routing_context_inspect_dht_record(
id: u32,
key: String,
subkeys: String,
scope: String,
) -> Promise {
let key: veilid_core::TypedKey = veilid_core::deserialize_json(&key).unwrap();
let subkeys: veilid_core::ValueSubkeyRangeSet =
veilid_core::deserialize_json(&subkeys).unwrap();
let scope: veilid_core::DHTReportScope = veilid_core::deserialize_json(&scope).unwrap();
wrap_api_future_json(async move {
let routing_context = {
let rc = (*ROUTING_CONTEXTS).borrow();
let Some(routing_context) = rc.get(&id) else {
return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(
"routing_context_inspect_dht_record",
"id",
id,
));
};
routing_context.clone()
};
let res = routing_context
.inspect_dht_record(key, subkeys, scope)
.await?;
APIResult::Ok(res)
})
}
#[wasm_bindgen()]
pub fn new_private_route() -> Promise {
wrap_api_future_json(async move {

View File

@ -195,6 +195,11 @@ impl VeilidClient {
APIRESULT_UNDEFINED
}
/// Get the current timestamp, in string format
pub fn now() -> String {
veilid_core::get_aligned_timestamp().as_u64().to_string()
}
/// Execute an 'internal debug command'.
pub async fn debug(command: String) -> APIResult<String> {
let veilid_api = get_veilid_api()?;

View File

@ -265,13 +265,13 @@ impl VeilidRoutingContext {
pub async fn getDhtValue(
&self,
key: String,
subKey: u32,
subkey: u32,
forceRefresh: bool,
) -> APIResult<Option<ValueData>> {
let key = TypedKey::from_str(&key)?;
let routing_context = self.getRoutingContext()?;
let res = routing_context
.get_dht_value(key, subKey, forceRefresh)
.get_dht_value(key, subkey, forceRefresh)
.await?;
APIResult::Ok(res)
}
@ -283,7 +283,7 @@ impl VeilidRoutingContext {
pub async fn setDhtValue(
&self,
key: String,
subKey: u32,
subkey: u32,
data: Box<[u8]>,
writer: Option<String>,
) -> APIResult<Option<ValueData>> {
@ -295,49 +295,120 @@ impl VeilidRoutingContext {
let routing_context = self.getRoutingContext()?;
let res = routing_context
.set_dht_value(key, subKey, data, writer)
.set_dht_value(key, subkey, data, writer)
.await?;
APIResult::Ok(res)
}
// pub async fn watchDhtValues(
// &self,
// key: String,
// subKeys: ValueSubkeyRangeSet,
// expiration: Timestamp,
// count: u32,
// ) -> APIResult<String> {
// let key: veilid_core::TypedKey = veilid_core::deserialize_json(&key).unwrap();
// let subkeys: veilid_core::ValueSubkeyRangeSet =
// veilid_core::deserialize_json(&subkeys).unwrap();
// let expiration = veilid_core::Timestamp::from_str(&expiration).unwrap();
/// Add or update a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change.
/// One remote node will be selected to perform the watch and it will offer an expiration time based on a suggestion, and make an attempt to
/// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around
/// otherwise the watch will be cancelled and will have to be re-watched.
///
/// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten.
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
/// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
///
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a faild update, the watch is considered cancelled.
///
/// DHT watches are accepted with the following conditions:
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record
/// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer
///
/// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability.
pub async fn watchDhtValues(
&self,
key: String,
subkeys: ValueSubkeyRangeSet,
expiration: String,
count: u32,
) -> APIResult<String> {
let key = TypedKey::from_str(&key)?;
let expiration =
veilid_core::Timestamp::from_str(&expiration).map_err(VeilidAPIError::generic)?;
// let routing_context = {
// let rc = (*ROUTING_CONTEXTS).borrow();
// let Some(routing_context) = rc.get(&id) else {
// return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("routing_context_watch_dht_values", "id", self.id));
// };
// routing_context.clone()
// };
// let res = routing_context
// .watch_dht_values(key, subkeys, expiration, count)
// .await?;
// APIResult::Ok(res.to_string())
// }
let routing_context = self.getRoutingContext()?;
let res = routing_context
.watch_dht_values(key, subkeys, expiration, count)
.await?;
APIResult::Ok(res.to_string())
}
// pub async fn cancelDhtWatch(id: u32, key: String, subkeys: String) -> Promise {
// let key: veilid_core::TypedKey = veilid_core::deserialize_json(&key).unwrap();
// let subkeys: veilid_core::ValueSubkeyRangeSet =
// veilid_core::deserialize_json(&subkeys).unwrap();
/// Cancels a watch early
///
/// This is a convenience function that cancels watching all subkeys in a range. The subkeys specified here
/// are subtracted from the watched subkey range. If no range is specified, this is equivalent to cancelling the entire range of subkeys.
/// Only the subkey range is changed, the expiration and count remain the same.
/// If no subkeys remain, the watch is entirely cancelled and will receive no more updates.
/// Returns true if there is any remaining watch for this record
/// Returns false if the entire watch has been cancelled
pub async fn cancelDhtWatch(
&self,
key: String,
subkeys: ValueSubkeyRangeSet,
) -> APIResult<bool> {
let key = TypedKey::from_str(&key)?;
// let routing_context = {
// let rc = (*ROUTING_CONTEXTS).borrow();
// let Some(routing_context) = rc.get(&id) else {
// return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument("routing_context_cancel_dht_watch", "id", self.id));
// };
// routing_context.clone()
// };
// let res = routing_context.cancel_dht_watch(key, subkeys).await?;
// APIResult::Ok(res)
// }
let routing_context = self.getRoutingContext()?;
let res = routing_context.cancel_dht_watch(key, subkeys).await?;
APIResult::Ok(res)
}
/// Inspects a DHT record for subkey state.
/// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network
/// to see what needs updating locally.
///
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges.
/// If no range is specified, this is equivalent to inspecting the entire range of subkeys. In total, the list of subkeys returned will be truncated at 512 elements.
/// * `scope` is what kind of range the inspection has:
///
/// - DHTReportScope::Local
/// Results will be only for a locally stored record.
/// Useful for seeing what subkeys you have locally and which ones have not been retrieved
///
/// - DHTReportScope::SyncGet
/// Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters
/// Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if the local copy did not exist locally.
/// Useful for determining if the current local copy should be updated from the network.
///
/// - DHTReportScope::SyncSet
/// Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters
/// Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if the local copy did not exist locally.
/// Useful for determining if the unchanged local copy should be pushed to the network.
///
/// - DHTReportScope::UpdateGet
/// Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters
/// Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if a GetValue operation were being performed, including accepting newer values from the network.
/// Useful for determining which subkeys would change with a GetValue operation
///
/// - DHTReportScope::UpdateSet
/// Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters
/// Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if a SetValue operation were being performed, including accepting newer values from the network.
/// This simulates a SetValue with the initial sequence number incremented by 1, like a real SetValue would when updating.
/// Useful for determine which subkeys would change with an SetValue operation
///
/// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range.
pub async fn inspectDhtRecord(
&self,
key: String,
subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope,
) -> APIResult<DHTRecordReport> {
let key = TypedKey::from_str(&key)?;
let routing_context = self.getRoutingContext()?;
let res = routing_context
.inspect_dht_record(key, subkeys, scope)
.await?;
APIResult::Ok(res)
}
}

View File

@ -108,7 +108,7 @@ describe('VeilidRoutingContext', () => {
});
after('free dht record', async () => {
await routingContext.closeDhtRecord(dhtRecord.key);
await routingContext.deleteDhtRecord(dhtRecord.key);
});
it('should set value', async () => {