mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-04-19 15:25:54 -04:00
change watch_dht_values return type. fix delete_dht_record regression
This commit is contained in:
parent
1b96a4e3ea
commit
4e9e1e7204
@ -105,6 +105,7 @@ impl VeilidComponentRegistry {
|
||||
self.namespace
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn program_name(&self) -> &'static str {
|
||||
self.program_name
|
||||
}
|
||||
|
@ -60,10 +60,11 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
let debug_string = format!(
|
||||
"OUT ==> SetValueQ({} #{} len={} writer={}{}) => {}",
|
||||
"OUT ==> SetValueQ({} #{} len={} seq={} writer={}{}) => {}",
|
||||
key,
|
||||
subkey,
|
||||
value.value_data().data().len(),
|
||||
value.value_data().seq(),
|
||||
value.value_data().writer(),
|
||||
if send_descriptor { " +senddesc" } else { "" },
|
||||
dest
|
||||
@ -125,8 +126,9 @@ impl RPCProcessor {
|
||||
.as_ref()
|
||||
.map(|v| {
|
||||
format!(
|
||||
" len={} writer={}",
|
||||
" len={} seq={} writer={}",
|
||||
v.value_data().data().len(),
|
||||
v.value_data().seq(),
|
||||
v.value_data().writer(),
|
||||
)
|
||||
})
|
||||
|
@ -552,10 +552,7 @@ impl StorageManager {
|
||||
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
|
||||
// Attempt to close the record, returning the opened record if it wasn't already closed
|
||||
let mut inner = self.inner.lock().await;
|
||||
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
Self::close_record_inner(&mut inner, key)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -566,9 +563,7 @@ impl StorageManager {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let keys = inner.opened_records.keys().copied().collect::<Vec<_>>();
|
||||
for key in keys {
|
||||
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
|
||||
return Ok(());
|
||||
};
|
||||
Self::close_record_inner(&mut inner, key)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -579,9 +574,7 @@ impl StorageManager {
|
||||
pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
|
||||
// Ensure the record is closed
|
||||
let mut inner = self.inner.lock().await;
|
||||
let Some(_opened_record) = Self::close_record_inner(&mut inner, key)? else {
|
||||
return Ok(());
|
||||
};
|
||||
Self::close_record_inner(&mut inner, key)?;
|
||||
|
||||
// Get record from the local store
|
||||
let Some(local_record_store) = inner.local_record_store.as_mut() else {
|
||||
@ -855,7 +848,7 @@ impl StorageManager {
|
||||
out
|
||||
}
|
||||
|
||||
/// Create,update or cancel an outbound watch to a DHT value
|
||||
/// Create, update or cancel an outbound watch to a DHT value
|
||||
#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
pub async fn watch_values(
|
||||
&self,
|
||||
@ -863,7 +856,7 @@ impl StorageManager {
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
) -> VeilidAPIResult<Timestamp> {
|
||||
) -> VeilidAPIResult<bool> {
|
||||
// Obtain the watch change lock
|
||||
// (may need to wait for background operations to complete on the watch)
|
||||
let watch_lock = self.outbound_watch_lock_table.lock_tag(key).await;
|
||||
@ -872,14 +865,14 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
//#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
async fn watch_values_inner(
|
||||
&self,
|
||||
watch_lock: AsyncTagLockGuard<TypedKey>,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
) -> VeilidAPIResult<Timestamp> {
|
||||
) -> VeilidAPIResult<bool> {
|
||||
let key = watch_lock.tag();
|
||||
|
||||
// Obtain the inner state lock
|
||||
@ -960,7 +953,7 @@ impl StorageManager {
|
||||
inner.outbound_watch_manager.outbound_watches.get_mut(&key)
|
||||
else {
|
||||
// Watch is gone
|
||||
return Ok(Timestamp::new(0));
|
||||
return Ok(false);
|
||||
};
|
||||
self.get_next_outbound_watch_operation(
|
||||
key,
|
||||
@ -975,12 +968,7 @@ impl StorageManager {
|
||||
op_fut.await;
|
||||
}
|
||||
|
||||
let inner = self.inner.lock().await;
|
||||
let expiration_ts = inner
|
||||
.outbound_watch_manager
|
||||
.get_min_expiration(key)
|
||||
.unwrap_or_default();
|
||||
Ok(expiration_ts)
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "stor", skip_all)]
|
||||
@ -1041,20 +1029,10 @@ impl StorageManager {
|
||||
(new_subkeys, count, desired.expiration_ts)
|
||||
};
|
||||
|
||||
// Update the watch. This just calls through to the above watch_values() function
|
||||
// Update the watch. This just calls through to the above watch_values_inner() function
|
||||
// This will update the active_watch so we don't need to do that in this routine.
|
||||
let expiration_ts =
|
||||
pin_future!(self.watch_values_inner(watch_lock, subkeys, expiration_ts, count)).await?;
|
||||
|
||||
// A zero expiration time returned from watch_value() means the watch is done
|
||||
// or no subkeys are left, and the watch is no longer active
|
||||
if expiration_ts.as_u64() == 0 {
|
||||
// Return false indicating the watch is completely gone
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Return true because the the watch was changed, but is not completely gone
|
||||
Ok(true)
|
||||
self.watch_values_inner(watch_lock, subkeys, expiration_ts, count)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Inspect an opened DHT record for its subkey sequence numbers
|
||||
@ -1640,22 +1618,21 @@ impl StorageManager {
|
||||
});
|
||||
}
|
||||
|
||||
fn close_record_inner(
|
||||
inner: &mut StorageManagerInner,
|
||||
key: TypedKey,
|
||||
) -> VeilidAPIResult<Option<OpenedRecord>> {
|
||||
fn close_record_inner(inner: &mut StorageManagerInner, key: TypedKey) -> VeilidAPIResult<()> {
|
||||
let Some(local_record_store) = inner.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
if local_record_store.peek_record(key, |_| {}).is_none() {
|
||||
return Err(VeilidAPIError::key_not_found(key));
|
||||
apibail_key_not_found!(key);
|
||||
}
|
||||
|
||||
// Set the watch to cancelled if we have one
|
||||
// Will process cancellation in the background
|
||||
inner.outbound_watch_manager.set_desired_watch(key, None);
|
||||
if inner.opened_records.remove(&key).is_some() {
|
||||
// Set the watch to cancelled if we have one
|
||||
// Will process cancellation in the background
|
||||
inner.outbound_watch_manager.set_desired_watch(key, None);
|
||||
}
|
||||
|
||||
Ok(inner.opened_records.remove(&key))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "stor", skip_all, err)]
|
||||
|
@ -116,12 +116,6 @@ impl OutboundWatchManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_min_expiration(&self, record_key: TypedKey) -> Option<Timestamp> {
|
||||
self.outbound_watches
|
||||
.get(&record_key)
|
||||
.and_then(|x| x.state().map(|y| y.min_expiration_ts()))
|
||||
}
|
||||
|
||||
/// Iterate all per-node watches and remove ones with dead nodes from outbound watches
|
||||
/// This may trigger reconciliation to increase the number of active per-node watches
|
||||
/// for an outbound watch that is still alive
|
||||
|
@ -1796,13 +1796,14 @@ impl VeilidAPI {
|
||||
get_subkeys,
|
||||
)
|
||||
.ok()
|
||||
.map(Some)
|
||||
.unwrap_or_else(|| {
|
||||
rest_defaults = true;
|
||||
Default::default()
|
||||
None
|
||||
});
|
||||
|
||||
let expiration = if rest_defaults {
|
||||
Default::default()
|
||||
let opt_expiration = if rest_defaults {
|
||||
None
|
||||
} else {
|
||||
get_debug_argument_at(
|
||||
&args,
|
||||
@ -1812,14 +1813,20 @@ impl VeilidAPI {
|
||||
parse_duration,
|
||||
)
|
||||
.ok()
|
||||
.map(|dur| if dur == 0 { 0 } else { dur + get_timestamp() })
|
||||
.map(|dur| {
|
||||
if dur == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(Timestamp::new(dur + get_timestamp()))
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
rest_defaults = true;
|
||||
Default::default()
|
||||
None
|
||||
})
|
||||
};
|
||||
let count = if rest_defaults {
|
||||
u32::MAX
|
||||
None
|
||||
} else {
|
||||
get_debug_argument_at(
|
||||
&args,
|
||||
@ -1829,15 +1836,16 @@ impl VeilidAPI {
|
||||
get_number,
|
||||
)
|
||||
.ok()
|
||||
.map(Some)
|
||||
.unwrap_or_else(|| {
|
||||
rest_defaults = true;
|
||||
u32::MAX
|
||||
Some(u32::MAX)
|
||||
})
|
||||
};
|
||||
|
||||
// Do a record watch
|
||||
let ts = match rc
|
||||
.watch_dht_values(key, subkeys, Timestamp::new(expiration), count)
|
||||
let active = match rc
|
||||
.watch_dht_values(key, subkeys, opt_expiration, count)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
@ -1845,10 +1853,10 @@ impl VeilidAPI {
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
if ts.as_u64() == 0 {
|
||||
if !active {
|
||||
return Ok("Failed to watch value".to_owned());
|
||||
}
|
||||
Ok(format!("Success: expiration={:?}", display_ts(ts.as_u64())))
|
||||
Ok("Success".to_owned())
|
||||
}
|
||||
|
||||
async fn debug_record_cancel(&self, args: Vec<String>) -> VeilidAPIResult<String> {
|
||||
|
@ -78,9 +78,9 @@ pub enum RoutingContextRequestOp {
|
||||
WatchDhtValues {
|
||||
#[schemars(with = "String")]
|
||||
key: TypedKey,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
subkeys: Option<ValueSubkeyRangeSet>,
|
||||
expiration: Option<Timestamp>,
|
||||
count: Option<u32>,
|
||||
},
|
||||
CancelDhtWatch {
|
||||
#[schemars(with = "String")]
|
||||
@ -149,7 +149,7 @@ pub enum RoutingContextResponseOp {
|
||||
},
|
||||
WatchDhtValues {
|
||||
#[serde(flatten)]
|
||||
result: ApiResult<Timestamp>,
|
||||
result: ApiResult<bool>,
|
||||
},
|
||||
CancelDhtWatch {
|
||||
#[serde(flatten)]
|
||||
|
@ -398,13 +398,18 @@ impl RoutingContext {
|
||||
///
|
||||
/// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten.
|
||||
/// * `key` is the record key to watch. it must first be opened for reading or writing.
|
||||
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
|
||||
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
|
||||
/// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
|
||||
/// * `subkeys`:
|
||||
/// - None: specifies watching the entire range of subkeys.
|
||||
/// - Some(range): is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
|
||||
/// * `expiration`:
|
||||
/// - None: specifies a watch with no expiration
|
||||
/// - Some(timestamp): the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
|
||||
/// * `count:
|
||||
/// - None: specifies a watch count of u32::MAX
|
||||
/// - Some(count): is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
|
||||
///
|
||||
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
|
||||
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
|
||||
/// If the returned timestamp is zero it indicates that the watch is considered cancelled, either from a failed update or due to `count` being zero
|
||||
/// Returns Ok(true) if a watch is active for this record.
|
||||
/// Returns Ok(false) if the entire watch has been cancelled.
|
||||
///
|
||||
/// DHT watches are accepted with the following conditions:
|
||||
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record.
|
||||
@ -415,12 +420,15 @@ impl RoutingContext {
|
||||
pub async fn watch_dht_values(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
) -> VeilidAPIResult<Timestamp> {
|
||||
subkeys: Option<ValueSubkeyRangeSet>,
|
||||
expiration: Option<Timestamp>,
|
||||
count: Option<u32>,
|
||||
) -> VeilidAPIResult<bool> {
|
||||
veilid_log!(self debug
|
||||
"RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {}, count: {})", self, key, subkeys, expiration, count);
|
||||
"RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {:?}, count: {:?})", self, key, subkeys, expiration, count);
|
||||
let subkeys = subkeys.unwrap_or_default();
|
||||
let expiration = expiration.unwrap_or_default();
|
||||
let count = count.unwrap_or(u32::MAX);
|
||||
|
||||
Crypto::validate_crypto_kind(key.kind)?;
|
||||
|
||||
@ -431,11 +439,12 @@ impl RoutingContext {
|
||||
/// Cancels a watch early.
|
||||
///
|
||||
/// This is a convenience function that cancels watching all subkeys in a range. The subkeys specified here
|
||||
/// are subtracted from the watched subkey range. If no range is specified, this is equivalent to cancelling the entire range of subkeys.
|
||||
/// are subtracted from the currently-watched subkey range.
|
||||
/// If no range is specified, this is equivalent to cancelling the entire range of subkeys.
|
||||
/// Only the subkey range is changed, the expiration and count remain the same.
|
||||
/// If no subkeys remain, the watch is entirely cancelled and will receive no more updates.
|
||||
///
|
||||
/// Returns Ok(true) if there is any remaining watch for this record.
|
||||
/// Returns Ok(true) if a watch is active for this record.
|
||||
/// Returns Ok(false) if the entire watch has been cancelled.
|
||||
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
|
||||
pub async fn cancel_dht_watch(
|
||||
|
@ -301,7 +301,7 @@ abstract class VeilidRoutingContext {
|
||||
{bool forceRefresh = false});
|
||||
Future<ValueData?> setDHTValue(TypedKey key, int subkey, Uint8List data,
|
||||
{KeyPair? writer});
|
||||
Future<Timestamp> watchDHTValues(TypedKey key,
|
||||
Future<bool> watchDHTValues(TypedKey key,
|
||||
{List<ValueSubkeyRange>? subkeys, Timestamp? expiration, int? count});
|
||||
Future<bool> cancelDHTWatch(TypedKey key, {List<ValueSubkeyRange>? subkeys});
|
||||
Future<DHTRecordReport> inspectDHTRecord(TypedKey key,
|
||||
|
@ -703,7 +703,7 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
|
||||
}
|
||||
|
||||
@override
|
||||
Future<Timestamp> watchDHTValues(TypedKey key,
|
||||
Future<bool> watchDHTValues(TypedKey key,
|
||||
{List<ValueSubkeyRange>? subkeys,
|
||||
Timestamp? expiration,
|
||||
int? count}) async {
|
||||
@ -720,9 +720,8 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
|
||||
final sendPort = recvPort.sendPort;
|
||||
_ctx.ffi._routingContextWatchDHTValues(sendPort.nativePort, _ctx.id!,
|
||||
nativeKey, nativeSubkeys, nativeExpiration, count);
|
||||
final actualExpiration = Timestamp(
|
||||
value: BigInt.from(await processFuturePlain<int>(recvPort.first)));
|
||||
return actualExpiration;
|
||||
final active = await processFuturePlain<bool>(recvPort.first);
|
||||
return active;
|
||||
}
|
||||
|
||||
@override
|
||||
@ -738,8 +737,8 @@ class VeilidRoutingContextFFI extends VeilidRoutingContext {
|
||||
final sendPort = recvPort.sendPort;
|
||||
_ctx.ffi._routingContextCancelDHTWatch(
|
||||
sendPort.nativePort, _ctx.id!, nativeKey, nativeSubkeys);
|
||||
final cancelled = await processFuturePlain<bool>(recvPort.first);
|
||||
return cancelled;
|
||||
final active = await processFuturePlain<bool>(recvPort.first);
|
||||
return active;
|
||||
}
|
||||
|
||||
@override
|
||||
|
@ -206,7 +206,7 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
|
||||
}
|
||||
|
||||
@override
|
||||
Future<Timestamp> watchDHTValues(TypedKey key,
|
||||
Future<bool> watchDHTValues(TypedKey key,
|
||||
{List<ValueSubkeyRange>? subkeys,
|
||||
Timestamp? expiration,
|
||||
int? count}) async {
|
||||
@ -215,7 +215,7 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
|
||||
count ??= 0xFFFFFFFF;
|
||||
|
||||
final id = _ctx.requireId();
|
||||
final ts = await _wrapApiPromise<String>(js_util.callMethod(
|
||||
return _wrapApiPromise<bool>(js_util.callMethod(
|
||||
wasm, 'routing_context_watch_dht_values', [
|
||||
id,
|
||||
jsonEncode(key),
|
||||
@ -223,7 +223,6 @@ class VeilidRoutingContextJS extends VeilidRoutingContext {
|
||||
expiration.toString(),
|
||||
count
|
||||
]));
|
||||
return Timestamp.fromString(ts);
|
||||
}
|
||||
|
||||
@override
|
||||
|
@ -821,9 +821,9 @@ pub extern "C" fn routing_context_watch_dht_values(
|
||||
let routing_context = get_routing_context(id, "routing_context_watch_dht_values")?;
|
||||
|
||||
let res = routing_context
|
||||
.watch_dht_values(key, subkeys, expiration, count)
|
||||
.watch_dht_values(key, Some(subkeys), Some(expiration), Some(count))
|
||||
.await?;
|
||||
APIResult::Ok(res.as_u64())
|
||||
APIResult::Ok(res)
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
@ -297,8 +297,8 @@ async def test_watch_dht_values():
|
||||
await sync(rc0, [rec0])
|
||||
|
||||
# Server 0: Make a watch on all the subkeys
|
||||
ts = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF)
|
||||
assert ts != 0
|
||||
active = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF)
|
||||
assert active
|
||||
|
||||
# Server 1: Open the subkey
|
||||
rec1 = await rc1.open_dht_record(rec0.key, rec0.owner_key_pair())
|
||||
@ -345,8 +345,8 @@ async def test_watch_dht_values():
|
||||
assert upd.detail.value.data == b"BZORT"
|
||||
|
||||
# Server 0: Cancel some subkeys we don't care about
|
||||
still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(0), ValueSubkey(3))])
|
||||
assert still_active
|
||||
active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(0), ValueSubkey(3))])
|
||||
assert active
|
||||
|
||||
# Server 1: Now set multiple subkeys and trigger an update
|
||||
vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT BZORT")])
|
||||
@ -373,8 +373,8 @@ async def test_watch_dht_values():
|
||||
assert upd is None
|
||||
|
||||
# Now cancel the update
|
||||
still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))])
|
||||
assert not still_active
|
||||
active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))])
|
||||
assert not active
|
||||
|
||||
# Server 0: Wait for the cancellation update
|
||||
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
|
||||
@ -462,8 +462,8 @@ async def test_watch_many_dht_values():
|
||||
assert vd is None
|
||||
|
||||
# Server 0: Make a watch on all the subkeys
|
||||
ts = await rc0.watch_dht_values(records[n].key, [], Timestamp(0), 0xFFFFFFFF)
|
||||
assert ts != 0
|
||||
active = await rc0.watch_dht_values(records[n].key, [], Timestamp(0), 0xFFFFFFFF)
|
||||
assert active
|
||||
|
||||
# Open and set all records
|
||||
missing_records = set()
|
||||
|
@ -92,17 +92,17 @@ class RoutingContext(ABC):
|
||||
async def watch_dht_values(
|
||||
self,
|
||||
key: types.TypedKey,
|
||||
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
|
||||
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]] = [],
|
||||
expiration: types.Timestamp = types.Timestamp(0),
|
||||
count: int = 0xFFFFFFFF,
|
||||
) -> types.Timestamp:
|
||||
) -> bool:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def cancel_dht_watch(
|
||||
self,
|
||||
key: types.TypedKey,
|
||||
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
|
||||
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]] = [],
|
||||
) -> bool:
|
||||
pass
|
||||
|
||||
|
@ -740,10 +740,10 @@ class _JsonRoutingContext(RoutingContext):
|
||||
async def watch_dht_values(
|
||||
self,
|
||||
key: TypedKey,
|
||||
subkeys: list[tuple[ValueSubkey, ValueSubkey]],
|
||||
subkeys: list[tuple[ValueSubkey, ValueSubkey]] = [],
|
||||
expiration: Timestamp = Timestamp(0),
|
||||
count: int = 0xFFFFFFFF,
|
||||
) -> Timestamp:
|
||||
) -> bool:
|
||||
assert isinstance(key, TypedKey)
|
||||
assert isinstance(subkeys, list)
|
||||
for s in subkeys:
|
||||
@ -753,23 +753,22 @@ class _JsonRoutingContext(RoutingContext):
|
||||
assert isinstance(expiration, Timestamp)
|
||||
assert isinstance(count, int)
|
||||
|
||||
return Timestamp(
|
||||
raise_api_result(
|
||||
await self.api.send_ndjson_request(
|
||||
Operation.ROUTING_CONTEXT,
|
||||
validate=validate_rc_op,
|
||||
rc_id=self.rc_id,
|
||||
rc_op=RoutingContextOperation.WATCH_DHT_VALUES,
|
||||
key=key,
|
||||
subkeys=subkeys,
|
||||
expiration=str(expiration),
|
||||
count=count,
|
||||
)
|
||||
return raise_api_result(
|
||||
await self.api.send_ndjson_request(
|
||||
Operation.ROUTING_CONTEXT,
|
||||
validate=validate_rc_op,
|
||||
rc_id=self.rc_id,
|
||||
rc_op=RoutingContextOperation.WATCH_DHT_VALUES,
|
||||
key=key,
|
||||
subkeys=subkeys,
|
||||
expiration=str(expiration),
|
||||
count=count,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def cancel_dht_watch(
|
||||
self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]]
|
||||
self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]] = []
|
||||
) -> bool:
|
||||
assert isinstance(key, TypedKey)
|
||||
assert isinstance(subkeys, list)
|
||||
|
@ -858,7 +858,7 @@
|
||||
],
|
||||
"properties": {
|
||||
"value": {
|
||||
"type": "string"
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -469,20 +469,23 @@
|
||||
{
|
||||
"type": "object",
|
||||
"required": [
|
||||
"count",
|
||||
"expiration",
|
||||
"key",
|
||||
"rc_op",
|
||||
"subkeys"
|
||||
"rc_op"
|
||||
],
|
||||
"properties": {
|
||||
"count": {
|
||||
"type": "integer",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
],
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
},
|
||||
"expiration": {
|
||||
"type": "string"
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"key": {
|
||||
"type": "string"
|
||||
@ -494,7 +497,10 @@
|
||||
]
|
||||
},
|
||||
"subkeys": {
|
||||
"type": "array",
|
||||
"type": [
|
||||
"array",
|
||||
"null"
|
||||
],
|
||||
"items": {
|
||||
"type": "array",
|
||||
"items": [
|
||||
|
@ -344,17 +344,22 @@ impl VeilidRoutingContext {
|
||||
///
|
||||
/// There is only one watch permitted per record. If a change to a watch is desired, the previous one will be overwritten.
|
||||
/// * `key` is the record key to watch. it must first be opened for reading or writing.
|
||||
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
|
||||
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
|
||||
/// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
|
||||
/// * `subkeys`:
|
||||
/// - None: specifies watching the entire range of subkeys.
|
||||
/// - Some(range): is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
|
||||
/// * `expiration`:
|
||||
/// - None: specifies a watch with no expiration
|
||||
/// - Some(timestamp): the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
|
||||
/// * `count:
|
||||
/// - None: specifies a watch count of u32::MAX
|
||||
/// - Some(count): is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
|
||||
///
|
||||
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future,
|
||||
/// and the returned timestamp will be no later than the requested expiration, but -may- be before the requested expiration.
|
||||
/// If the returned timestamp is zero it indicates that the watch creation or update has failed. In the case of a faild update, the watch is considered cancelled.
|
||||
/// Returns Ok(true) if a watch is active for this record.
|
||||
/// Returns Ok(false) if the entire watch has been cancelled.
|
||||
///
|
||||
/// DHT watches are accepted with the following conditions:
|
||||
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record
|
||||
/// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer
|
||||
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record.
|
||||
/// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer.
|
||||
///
|
||||
/// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability.
|
||||
pub async fn watchDhtValues(
|
||||
@ -363,34 +368,32 @@ impl VeilidRoutingContext {
|
||||
subkeys: Option<ValueSubkeyRangeSet>,
|
||||
expiration: Option<String>,
|
||||
count: Option<u32>,
|
||||
) -> APIResult<String> {
|
||||
) -> APIResult<bool> {
|
||||
let key = TypedKey::from_str(&key)?;
|
||||
let subkeys = subkeys.unwrap_or_default();
|
||||
let expiration = if let Some(expiration) = expiration {
|
||||
veilid_core::Timestamp::new(
|
||||
Some(veilid_core::Timestamp::new(
|
||||
u64::from_str(&expiration).map_err(VeilidAPIError::generic)?,
|
||||
)
|
||||
))
|
||||
} else {
|
||||
veilid_core::Timestamp::default()
|
||||
None
|
||||
};
|
||||
let count = count.unwrap_or(u32::MAX);
|
||||
|
||||
let routing_context = self.getRoutingContext()?;
|
||||
let res = routing_context
|
||||
.watch_dht_values(key, subkeys, expiration, count)
|
||||
.await?;
|
||||
APIResult::Ok(res.as_u64().to_string())
|
||||
APIResult::Ok(res)
|
||||
}
|
||||
|
||||
/// Cancels a watch early
|
||||
/// Cancels a watch early.
|
||||
///
|
||||
/// This is a convenience function that cancels watching all subkeys in a range. The subkeys specified here
|
||||
/// are subtracted from the watched subkey range. If no range is specified, this is equivalent to cancelling the entire range of subkeys.
|
||||
/// are subtracted from the currently-watched subkey range.
|
||||
/// If no range is specified, this is equivalent to cancelling the entire range of subkeys.
|
||||
/// Only the subkey range is changed, the expiration and count remain the same.
|
||||
/// If no subkeys remain, the watch is entirely cancelled and will receive no more updates.
|
||||
///
|
||||
/// Returns true if there is any remaining watch for this record
|
||||
/// Returns false if the entire watch has been cancelled
|
||||
/// Returns Ok(true) if a watch is active for this record.
|
||||
/// Returns Ok(false) if the entire watch has been cancelled.
|
||||
pub async fn cancelDhtWatch(
|
||||
&self,
|
||||
key: String,
|
||||
|
2
veilid-wasm/tests/package-lock.json
generated
2
veilid-wasm/tests/package-lock.json
generated
@ -21,7 +21,7 @@
|
||||
},
|
||||
"../pkg": {
|
||||
"name": "veilid-wasm",
|
||||
"version": "0.4.3",
|
||||
"version": "0.4.4",
|
||||
"dev": true,
|
||||
"license": "MPL-2.0"
|
||||
},
|
||||
|
@ -81,7 +81,7 @@ describe('VeilidRoutingContext', () => {
|
||||
const dhtRecord = await routingContext.createDhtRecord({ kind: 'DFLT', o_cnt: 1 });
|
||||
expect(dhtRecord.key).toBeDefined();
|
||||
expect(dhtRecord.owner).toBeDefined();
|
||||
expect(dhtRecord.owner_secret).toBeDefined();
|
||||
expect(dhtRecord.owner_secret).toBeDefined();
|
||||
expect(dhtRecord.schema).toEqual({ kind: 'DFLT', o_cnt: 1 });
|
||||
});
|
||||
|
||||
@ -236,9 +236,7 @@ describe('VeilidRoutingContext', () => {
|
||||
"0",
|
||||
0xFFFFFFFF,
|
||||
);
|
||||
expect(watchValueRes).toBeDefined();
|
||||
expect(watchValueRes).not.toEqual("");
|
||||
expect(watchValueRes).not.toEqual("0");
|
||||
expect(watchValueRes).toEqual(true);
|
||||
|
||||
const cancelValueRes = await routingContext.cancelDhtWatch(
|
||||
dhtRecord.key,
|
||||
@ -261,9 +259,7 @@ describe('VeilidRoutingContext', () => {
|
||||
const watchValueRes = await routingContext.watchDhtValues(
|
||||
dhtRecord.key,
|
||||
);
|
||||
expect(watchValueRes).toBeDefined();
|
||||
expect(watchValueRes).not.toEqual("");
|
||||
expect(watchValueRes).not.toEqual("0");
|
||||
expect(watchValueRes).toEqual(true);
|
||||
|
||||
const cancelValueRes = await routingContext.cancelDhtWatch(
|
||||
dhtRecord.key,
|
||||
|
Loading…
x
Reference in New Issue
Block a user