more semantics cleanup

This commit is contained in:
Christien Rioux 2024-03-05 16:49:23 -05:00
parent 9b4e490994
commit 119a5668ac
4 changed files with 20 additions and 12 deletions

View File

@ -353,7 +353,7 @@ struct OperationSetValueA @0x9378d0732dc95be2 {
struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
key @0 :TypedKey; # key for value to watch
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), can not be empty, to watch everything use 0..=UINT32_MAX
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watchId @4 :UInt64; # if 0, request a new watch. if >0, existing watch id
@ -370,7 +370,7 @@ struct OperationWatchValueA @0xa726cab7064ba893 {
struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {
key @0 :TypedKey; # key for value that changed
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time)
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, this can be empty if this is a watch expiration notice)
count @2 :UInt32; # remaining changes left (0 means watch has expired)
watchId @3 :UInt64; # watch id this value change came from
value @4 :SignedValueData; # first value that changed (the rest can be gotten with getvalue)

View File

@ -131,6 +131,8 @@ impl RPCProcessor {
// Validate accepted requests
if accepted {
xxx does this make sense?
// Verify returned answer watch id is the same as the question watch id if it exists
if let Some(question_watch_id) = question_watch_id {
if question_watch_id != watch_id {

View File

@ -566,7 +566,8 @@ impl StorageManager {
Ok(None)
}
/// Add or change a watch to a DHT value
/// Add or change an outbound watch to a DHT value
xxx decide if empty subkey range is valid. should probably reject that everywhere and use a default range of 'full'
pub async fn watch_values(
&self,
key: TypedKey,
@ -604,7 +605,7 @@ impl StorageManager {
// Drop the lock for network access
drop(inner);
xxx continue here, make sure watch value semantics respect the 'accepted' flag and appropriate return values everywhere
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self
@ -649,7 +650,7 @@ xxx continue here, make sure watch value semantics respect the 'accepted' flag a
expiration.as_u64()
};
// If the expiration time is less than our minimum expiration time consider this watch cancelled
// If the expiration time is less than our minimum expiration time (or zero) consider this watch cancelled
let mut expiration_ts = owvresult.expiration_ts;
if expiration_ts.as_u64() < min_expiration_ts {
return Ok(Timestamp::new(0));
@ -662,6 +663,10 @@ xxx continue here, make sure watch value semantics respect the 'accepted' flag a
// If we requested a cancellation, then consider this watch cancelled
if count == 0 {
// Expiration returned should be zero if we requested a cancellation
if expiration_ts.as_u64() != 0 {
log_stor!(debug "got unexpired watch despite asking for a cancellation");
}
return Ok(Timestamp::new(0));
}

View File

@ -36,12 +36,12 @@ impl StorageManager {
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'WatchValue', some of which are the same for 'WatchValue' operations
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
let (key_count, timeout_us) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
)
};
@ -53,7 +53,8 @@ impl StorageManager {
inner.get_value_nodes(key)?.unwrap_or_default()
};
// Get the appropriate watcher key
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime
let watcher = opt_watcher.unwrap_or_else(|| {
self.unlocked_inner
.anonymous_watch_keys
@ -89,10 +90,10 @@ impl StorageManager {
);
// Keep answer if we got one
if wva.answer.expiration_ts.as_u64() > 0 {
if count > 0 {
// If we asked for a nonzero notification count, then this is an accepted watch
log_stor!(debug "Watch accepted: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
if wva.answer.accepted {
if expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
log_stor!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
} else {
// If we asked for a zero notification count, then this is a cancelled watch
log_stor!(debug "Watch cancelled: id={}", wva.answer.watch_id);