watchvalue recordkeeping

This commit is contained in:
Christien Rioux 2023-11-21 19:39:27 -05:00
parent 9d9a76e45c
commit 248f8dad06
18 changed files with 300 additions and 87 deletions

View File

@ -361,7 +361,7 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
}
struct OperationWatchValueA @0xa726cab7064ba893 {
expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch failed)
expiration @0 :UInt64; # timestamp when this watch will expire in usec since epoch (0 if watch was rejected). if watch is being cancelled (with count = 0), this will be the non-zero former expiration time.
peers @1 :List(PeerInfo); # returned list of other nodes to ask that could propagate watches
}

View File

@ -3,7 +3,7 @@ use super::*;
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(super) struct RouteSpecStoreContent {
/// All of the route sets we have allocated so far indexed by key
/// All of the route sets we have allocated so far indexed by key (many to one)
id_by_key: HashMap<PublicKey, RouteId>,
/// All of the route sets we have allocated so far
details: HashMap<RouteId, RouteSetSpecDetail>,

View File

@ -219,7 +219,10 @@ where
Ok(())
}
pub async fn run(self: Arc<Self>) -> TimeoutOr<Result<Option<R>, RPCError>> {
pub async fn run(
self: Arc<Self>,
opt_init_fanout_queue: Option<Vec<NodeRef>>,
) -> TimeoutOr<Result<Option<R>, RPCError>> {
// Get timeout in milliseconds
let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) {
Ok(v) => v,
@ -229,7 +232,9 @@ where
};
// Initialize closest nodes list
if let Err(e) = self.clone().init_closest_nodes() {
if let Some(init_fanout_queue) = opt_init_fanout_queue {
self.clone().add_to_fanout_queue(&init_fanout_queue);
} else if let Err(e) = self.clone().init_closest_nodes() {
return TimeoutOr::value(Err(e));
}

View File

@ -196,12 +196,16 @@ struct WaitableReply {
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
pub latency: TimestampDuration, // how long it took to get this answer
pub answer: T, // the answer itself
/// Hpw long it took to get this answer
pub latency: TimestampDuration,
/// The private route requested to receive the reply
pub reply_private_route: Option<PublicKey>,
/// The answer itself
pub answer: T,
}
impl<T> Answer<T> {
pub fn new(latency: TimestampDuration, answer: T) -> Self {
Self { latency, answer }
pub fn new(latency: TimestampDuration, reply_private_route: Option<PublicKey>, answer: T) -> Self {
Self { latency, reply_private_route, answer }
}
}
@ -512,7 +516,7 @@ impl RPCProcessor {
check_done,
);
fanout_call.run().await
fanout_call.run(None).await
}
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference

View File

@ -23,6 +23,9 @@ impl RPCProcessor {
// Send the app call question
let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -45,7 +48,11 @@ impl RPCProcessor {
tracing::Span::current().record("ret.latency", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", a_message.len());
Ok(NetworkResult::value(Answer::new(latency, a_message)))
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
a_message,
)))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]

View File

@ -43,6 +43,9 @@ impl RPCProcessor {
// Send the find_node request
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -74,7 +77,11 @@ impl RPCProcessor {
}
}
Ok(NetworkResult::value(Answer::new(latency, peers)))
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
peers,
)))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]

View File

@ -82,6 +82,9 @@ impl RPCProcessor {
.await?
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -156,6 +159,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
GetValueAnswer {
value,
peers,

View File

@ -96,6 +96,8 @@ impl RPCProcessor {
.await?
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
@ -174,6 +176,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
SetValueAnswer { set, value, peers },
)))
}

View File

@ -113,6 +113,9 @@ impl RPCProcessor {
// Note what kind of ping this was and to what peer scope
let send_data_method = waitable_reply.send_data_method.clone();
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -190,7 +193,11 @@ impl RPCProcessor {
// sender info is irrelevant over relays and routes
}
};
Ok(NetworkResult::value(Answer::new(latency, opt_sender_info)))
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
opt_sender_info,
)))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]

View File

@ -77,6 +77,9 @@ impl RPCProcessor {
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -138,6 +141,7 @@ impl RPCProcessor {
Ok(NetworkResult::value(Answer::new(
latency,
reply_private_route,
WatchValueAnswer {
expiration_ts: Timestamp::new(expiration),
peers,

View File

@ -174,7 +174,7 @@ impl StorageManager {
check_done,
);
match fanout_call.run().await {
match fanout_call.run(None).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got

View File

@ -255,19 +255,52 @@ impl StorageManager {
/// Close an opened local record
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let mut inner = self.lock().await?;
inner.close_record(key)
let (opened_record, opt_rpc_processor) = {
let mut inner = self.lock().await?;
(inner.close_record(key)?, inner.rpc_processor.clone())
};
// Send a one-time cancel request for the watch if we have one and we're online
if let Some(active_watch) = opened_record.active_watch() {
if let Some(rpc_processor) = opt_rpc_processor {
// Use the safety selection we opened the record with
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self
.outbound_watch_value(
rpc_processor,
key,
ValueSubkeyRangeSet::full(),
Timestamp::new(0),
0,
opened_record.safety_selection(),
opened_record.writer().cloned(),
Some(active_watch.watch_node),
)
.await?;
if let Some(owvresult) = opt_owvresult {
if owvresult.expiration_ts.as_u64() != 0 {
log_stor!(debug
"close record watch cancel got unexpected expiration: {}",
owvresult.expiration_ts
);
}
} else {
log_stor!(debug "close record watch cancel unsuccessful");
}
} else {
log_stor!(debug "skipping last-ditch watch cancel because we are offline");
}
}
Ok(())
}
/// Delete a local record
pub async fn delete_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let mut inner = self.lock().await?;
// Ensure the record is closed
if inner.opened_records.contains_key(&key) {
inner.close_record(key)?;
}
self.close_record(key).await?;
let mut inner = self.lock().await?;
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
@ -482,14 +515,22 @@ impl StorageManager {
) -> VeilidAPIResult<Timestamp> {
let inner = self.lock().await?;
// Rewrite subkey range if empty to full
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
} else {
subkeys
};
// Get the safety selection and the writer we opened this record with
let (safety_selection, opt_writer) = {
let (safety_selection, opt_writer, opt_watch_node) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record.active_watch().map(|aw| aw.watch_node.clone()),
)
};
@ -502,60 +543,112 @@ impl StorageManager {
drop(inner);
// Use the safety selection we opened the record with
let expiration_ts = self
// Use the writer we opened with as the 'watcher' as well
let opt_owvresult = self
.outbound_watch_value(
rpc_processor,
key,
subkeys,
subkeys.clone(),
expiration,
count,
safety_selection,
opt_writer,
opt_watch_node,
)
.await?;
Ok(expiration_ts)
// If we did not get a valid response return a zero timestamp
let Some(owvresult) = opt_owvresult else {
return Ok(Timestamp::new(0));
};
// Clear any existing watch if the watch succeeded or got cancelled
let mut inner = self.lock().await?;
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
apibail_generic!("record not open");
};
opened_record.clear_active_watch();
// Get the minimum expiration timestamp we will accept
let rpc_timeout_us = {
let c = self.unlocked_inner.config.get();
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms))
};
let cur_ts = get_timestamp();
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
// If the expiration time is less than our minimum expiration time or greater than the requested time, consider this watch cancelled
if owvresult.expiration_ts.as_u64() < min_expiration_ts
|| owvresult.expiration_ts.as_u64() > expiration.as_u64()
{
// Don't set the watch so we ignore any stray valuechanged messages
return Ok(Timestamp::new(0));
}
// If we requested a cancellation, then consider this watch cancelled
if count == 0 {
return Ok(Timestamp::new(0));
}
// Keep a record of the watch
opened_record.set_active_watch(ActiveWatch {
expiration_ts: owvresult.expiration_ts,
watch_node: owvresult.watch_node,
opt_value_changed_route: owvresult.opt_value_changed_route,
subkeys,
count,
});
Ok(owvresult.expiration_ts)
}
// pub async fn cancel_watch_values(
// &self,
// key: TypedKey,
// subkeys: ValueSubkeyRangeSet,
// ) -> VeilidAPIResult<bool> {
// let inner = self.lock().await?;
pub async fn cancel_watch_values(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> {
let (subkeys, active_watch) = {
let inner = self.lock().await?;
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
// // // Get the safety selection and the writer we opened this record with
// // let (safety_selection, opt_writer) = {
// // let Some(opened_record) = inner.opened_records.get(&key) else {
// // apibail_generic!("record not open");
// // };
// // (
// // opened_record.safety_selection(),
// // opened_record.writer().cloned(),
// // )
// // };
// See what watch we have currently if any
let Some(active_watch) = opened_record.active_watch() else {
// If we didn't have an active watch, then we can just return false because there's nothing to do here
return Ok(false);
};
// // // Get rpc processor and drop mutex so we don't block while requesting the watch from the network
// // let Some(rpc_processor) = inner.rpc_processor.clone() else {
// // apibail_try_again!("offline, try again later");
// // };
// Rewrite subkey range if empty to full
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
} else {
subkeys
};
// // // Drop the lock for network access
// // drop(inner);
// Reduce the subkey range
let new_subkeys = active_watch.subkeys.difference(&subkeys);
// // // Use the safety selection we opened the record with
// // let expiration_ts = self
// // .outbound_watch_value(
// // rpc_processor,
// // key,
// // subkeys,
// // expiration,
// // count,
// // safety_selection,
// // opt_writer,
// // )
// // .await?;
(new_subkeys, active_watch)
};
// // Ok(expiration_ts)
// }
// If we have no subkeys left, then set the count to zero to indicate a full cancellation
let count = if subkeys.is_empty() {
0
} else {
active_watch.count
};
// Update the watch
let expiration_ts = self
.watch_values(key, subkeys, active_watch.expiration_ts, count)
.await?;
// A zero expiration time means the watch is done or nothing is left, and the watch is no longer active
if expiration_ts.as_u64() == 0 {
return Ok(false);
}
Ok(true)
}
}

View File

@ -165,7 +165,7 @@ impl StorageManager {
check_done,
);
match fanout_call.run().await {
match fanout_call.run(None).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got

View File

@ -409,9 +409,9 @@ impl StorageManagerInner {
Ok(descriptor)
}
pub fn get_value_nodes(&mut self, key: TypedKey) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
let Some(local_record_store) = self.local_record_store.as_ref() else {
apibail_not_initialized!();
};
@ -456,11 +456,11 @@ impl StorageManagerInner {
Ok(())
}
pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<()> {
let Some(_opened_record) = self.opened_records.remove(&key) else {
pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<OpenedRecord> {
let Some(opened_record) = self.opened_records.remove(&key) else {
apibail_generic!("record not open");
};
Ok(())
Ok(opened_record)
}
pub async fn handle_get_local_value(

View File

@ -1,5 +1,19 @@
use super::*;
#[derive(Clone, Debug)]
pub struct ActiveWatch {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
/// Which subkeys we are watching
pub subkeys: ValueSubkeyRangeSet,
/// How many notifications are left
pub count: u32,
}
/// The state associated with a local record when it is opened
/// This is not serialized to storage as it is ephemeral for the lifetime of the opened record
#[derive(Clone, Debug, Default)]
@ -11,6 +25,9 @@ pub struct OpenedRecord {
/// The safety selection in current use
safety_selection: SafetySelection,
/// Active watch we have on this record
active_watch: Option<ActiveWatch>,
}
impl OpenedRecord {
@ -18,6 +35,7 @@ impl OpenedRecord {
Self {
writer,
safety_selection,
active_watch: None,
}
}
@ -28,4 +46,16 @@ impl OpenedRecord {
pub fn safety_selection(&self) -> SafetySelection {
self.safety_selection
}
pub fn set_active_watch(&mut self, active_watch: ActiveWatch) {
self.active_watch = Some(active_watch);
}
pub fn clear_active_watch(&mut self) {
self.active_watch = None;
}
pub fn active_watch(&self) -> Option<ActiveWatch> {
self.active_watch.clone()
}
}

View File

@ -2,8 +2,19 @@ use super::*;
/// The context of the outbound_watch_value operation
struct OutboundWatchValueContext {
/// The timestamp for the expiration of the watch we successfully got
pub opt_expiration_ts: Option<Timestamp>,
/// A successful watch
pub opt_watch_value_result: Option<OutboundWatchValueResult>,
}
/// The result of the outbound_watch_value operation
#[derive(Debug, Clone)]
struct OutboundWatchValueResult {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
impl StorageManager {
@ -17,26 +28,30 @@ impl StorageManager {
count: u32,
safety_selection: SafetySelection,
opt_watcher: Option<KeyPair>,
) -> VeilidAPIResult<Timestamp> {
opt_watch_node: Option<NodeRef>,
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'WatchValue', some of which are the same for 'WatchValue' operations
let (key_count, timeout_us, rpc_timeout_us) = {
let (key_count, timeout_us) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
)
};
// Get the minimum expiration timestamp we will accept
let cur_ts = get_timestamp();
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
// Get the nodes we know are caching this value to seed the fanout
let opt_init_fanout_queue = if let Some(watch_node) = opt_watch_node {
Some(vec![watch_node])
} else {
let inner = self.inner.lock().await;
inner.get_value_nodes(key)?
};
// Make do-watch-value answer context
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
opt_expiration_ts: None,
opt_watch_value_result: None,
}));
// Routine to call to generate fanout
@ -59,11 +74,21 @@ impl StorageManager {
.await?
);
// Keep the expiration_ts if we got one
if wva.answer.expiration_ts.as_u64() >= min_expiration_ts {
log_stor!(debug "Got expiration back: expiration_ts={}", wva.answer.expiration_ts);
// Keep answer if we got one
if wva.answer.expiration_ts.as_u64() > 0 {
if count > 0 {
// If we asked for a nonzero notification count, then this is an accepted watch
log_stor!(debug "Watch accepted: expiration_ts={}", wva.answer.expiration_ts);
} else {
// If we asked for a zero notification count, then this is a cancelled watch
log_stor!(debug "Watch cancelled");
}
let mut ctx = context.lock();
ctx.opt_expiration_ts = Some(wva.answer.expiration_ts);
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
expiration_ts: wva.answer.expiration_ts,
watch_node: next_node.clone(),
opt_value_changed_route: wva.reply_private_route,
});
}
// Return peers if we have some
@ -78,7 +103,7 @@ impl StorageManager {
let check_done = |_closest_nodes: &[NodeRef]| {
// If a watch has succeeded, return done
let ctx = context.lock();
if ctx.opt_expiration_ts.is_some() {
if ctx.opt_watch_value_result.is_some() {
return Some(());
}
None
@ -97,39 +122,39 @@ impl StorageManager {
check_done,
);
match fanout_call.run().await {
match fanout_call.run(opt_init_fanout_queue).await {
// If we don't finish in the timeout (too much time passed without a successful watch)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_expiration_ts.is_some() {
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Timeout Success");
} else {
log_stor!(debug "WatchValue Fanout Timeout Failure");
}
Ok(ctx.opt_expiration_ts.unwrap_or_default())
Ok(ctx.opt_watch_value_result.clone())
}
// If we finished with done
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_expiration_ts.is_some() {
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Success");
} else {
log_stor!(debug "WatchValue Fanout Failure");
}
Ok(ctx.opt_expiration_ts.unwrap_or_default())
Ok(ctx.opt_watch_value_result.clone())
}
// If we ran out of nodes
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_expiration_ts.is_some() {
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Exhausted Success");
} else {
log_stor!(debug "WatchValue Fanout Exhausted Failure");
}
Ok(ctx.opt_expiration_ts.unwrap_or_default())
Ok(ctx.opt_watch_value_result.clone())
}
// Failed
TimeoutOr::Value(Err(e)) => {

View File

@ -350,6 +350,8 @@ impl RoutingContext {
/// Cancels a watch early
///
/// This is a convenience function that cancels watching all subkeys in a range
/// Returns Ok(true) if there is any remaining watch for this record
/// Returns Ok(false) if the entire watch has been cancelled
pub async fn cancel_dht_watch(
&self,
key: TypedKey,

View File

@ -16,6 +16,11 @@ impl ValueSubkeyRangeSet {
data: Default::default(),
}
}
pub fn full() -> Self {
let mut data = RangeSetBlaze::new();
data.ranges_insert(u32::MIN..=u32::MAX);
Self { data }
}
pub fn new_with_data(data: RangeSetBlaze<ValueSubkey>) -> Self {
Self { data }
}
@ -24,6 +29,23 @@ impl ValueSubkeyRangeSet {
data.insert(value);
Self { data }
}
pub fn interset(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data & other.data)
}
pub fn difference(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data - other.data)
}
pub fn union(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data | other.data)
}
pub fn data(&self) -> RangeSetBlaze<ValueSubkey> {
self.data().clone()
}
pub fn into_data(self) -> RangeSetBlaze<ValueSubkey> {
self.data()
}
}
impl FromStr for ValueSubkeyRangeSet {