clean up watch logic

This commit is contained in:
Christien Rioux 2024-03-04 10:15:35 -05:00
parent 30f2879827
commit 7e5d0d1204
5 changed files with 98 additions and 28 deletions

View File

@ -470,6 +470,11 @@ impl RPCProcessor {
) -> TimeoutOr<Result<Option<NodeRef>, RPCError>> {
let routing_table = self.routing_table();
// Ignore own node
if routing_table.matches_own_node_id(&[node_id]) {
return TimeoutOr::Value(Err(RPCError::network("can't search for own node id")));
}
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let this = self.clone();

View File

@ -16,6 +16,12 @@ impl RPCProcessor {
count: u32,
value: SignedValueData,
) -> RPCNetworkResult<()> {
// Ensure destination is never using a safety route
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
return Err(RPCError::internal(
"Never send value changes over safety routes",
));
}
let value_changed = RPCOperationValueChanged::new(key, subkeys, count, value)?;
let statement =
RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed)));
@ -35,6 +41,24 @@ impl RPCProcessor {
_ => panic!("not a statement"),
};
// Get the inbound node if if this came in directly
// If this was received over just a safety route, ignore it
// It this was received over a private route, the inbound node id could be either the actual
// node id, or a safety route (can't tell if a stub was used).
// Try it as the node if, and the storage manager will reject the
// value change if it doesn't match the active watch's node id
let inbound_node_id = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::SafetyRouted(_) => {
return Ok(NetworkResult::invalid_message(
"not processing value change over safety route",
));
}
RPCMessageHeaderDetail::PrivateRouted(p) => {
TypedKey::new(p.direct.envelope.get_crypto_kind(), p.remote_safety_route)
}
};
#[cfg(feature = "debug-dht")]
{
let debug_string_value = format!(
@ -59,7 +83,7 @@ impl RPCProcessor {
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
storage_manager
.inbound_value_changed(key, subkeys, count, Arc::new(value))
.inbound_value_changed(key, subkeys, count, Arc::new(value), inbound_node_id)
.await
.map_err(RPCError::internal)?;

View File

@ -35,7 +35,7 @@ struct WatchedRecordWatch {
#[derive(Debug, Default, Clone)]
/// A record being watched for changes
struct WatchedRecord {
pub(super) struct WatchedRecord {
/// The list of active watchers
watchers: Vec<WatchedRecordWatch>,
}
@ -279,6 +279,16 @@ where
log_stor!(error "dead record found in index: {:?}", dr.key);
}
// Record should have no watches now
if self.watched_records.contains_key(&dr.key) {
log_stor!(error "dead record found in watches: {:?}", dr.key);
}
// Record should have no watch changes now
if self.changed_watched_values.contains(&dr.key) {
log_stor!(error "dead record found in watch changes: {:?}", dr.key);
}
// Delete record
if let Err(e) = rt_xact.delete(0, &dr.key.bytes()) {
log_stor!(error "record could not be deleted: {}", e);
@ -399,8 +409,14 @@ where
apibail_key_not_found!(key);
};
self.add_dead_record(rtk, record);
// Remove watches
self.watched_records.remove(&rtk);
// Remove watch changes
self.changed_watched_values.remove(&rtk);
// Remove from table store immediately
self.add_dead_record(rtk, record);
self.purge_dead_records(false).await;
Ok(())
@ -880,6 +896,24 @@ where
Ok(ret_timestamp)
}
/// Move watches from one store to another
pub fn move_watches(
&mut self,
key: TypedKey,
in_watch: Option<(WatchedRecord, bool)>,
) -> Option<(WatchedRecord, bool)> {
let rtk = RecordTableKey { key };
let out = self.watched_records.remove(&rtk);
if let Some(in_watch) = in_watch {
self.watched_records.insert(rtk, in_watch.0);
if in_watch.1 {
self.changed_watched_values.insert(rtk);
}
}
let is_watched = self.changed_watched_values.remove(&rtk);
out.map(|r| (r, is_watched))
}
pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) {
// ValueChangedInfo but without the subkey data that requires a double mutable borrow to get
struct EarlyValueChangedInfo {

View File

@ -295,6 +295,9 @@ impl StorageManagerInner {
.await?;
}
// Move watches
local_record_store.move_watches(key, remote_record_store.move_watches(key, None));
// Delete remote record from store
remote_record_store.delete_record(key).await?;

View File

@ -191,14 +191,7 @@ impl StorageManager {
let (_is_local, opt_expiration_ts) = {
// See if the subkey we are watching has a local value
let opt_expiration_ts = inner
.handle_watch_local_value(
key,
subkeys.clone(),
expiration,
count,
target,
watcher,
)
.handle_watch_local_value(key, subkeys.clone(), expiration, count, target, watcher)
.await?;
if opt_expiration_ts.is_some() {
(true, opt_expiration_ts)
@ -221,33 +214,30 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet,
mut count: u32,
value: Arc<SignedValueData>,
inbound_node_id: TypedKey,
) -> VeilidAPIResult<()> {
// Update local record store with new value
let (res, opt_update_callback) = {
let mut inner = self.lock().await?;
let res = if let Some(first_subkey) = subkeys.first() {
inner
.handle_set_local_value(
key,
first_subkey,
value.clone(),
WatchUpdateMode::NoUpdate,
)
.await
} else {
VeilidAPIResult::Ok(())
};
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// Don't send update or update the ActiveWatch if this record is closed
return res;
// Don't process update if the record is closed
return Ok(());
};
let Some(mut active_watch) = opened_record.active_watch() else {
// No active watch means no callback
return res;
return Ok(());
};
if !active_watch
.watch_node
.node_ids()
.contains(&inbound_node_id)
{
// If the reporting node is not the same as our watch, don't process the value change
return Ok(());
}
if count > active_watch.count {
// If count is greater than our requested count then this is invalid, cancel the watch
log_stor!(debug "watch count went backward: {}: {}/{}", key, count, active_watch.count);
@ -259,7 +249,7 @@ impl StorageManager {
log_stor!(debug "watch count finished: {}", key);
opened_record.clear_active_watch();
} else {
log_stor!(
log_stor!(debug
"watch count decremented: {}: {}/{}",
key,
count,
@ -269,6 +259,20 @@ impl StorageManager {
opened_record.set_active_watch(active_watch);
}
// Set the local value
let res = if let Some(first_subkey) = subkeys.first() {
inner
.handle_set_local_value(
key,
first_subkey,
value.clone(),
WatchUpdateMode::NoUpdate,
)
.await
} else {
VeilidAPIResult::Ok(())
};
(res, inner.update_callback.clone())
};