mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-15 09:17:24 -05:00
don't fan out for watch value changes or cancels
fix fanout pop ordering
This commit is contained in:
parent
39c47dbd66
commit
ee040b32b9
@ -72,7 +72,7 @@ impl FanoutQueue {
|
||||
|
||||
// Return next fanout candidate
|
||||
pub fn next(&mut self) -> Option<NodeRef> {
|
||||
let cn = self.current_nodes.pop_front()?;
|
||||
let cn = self.current_nodes.pop_back()?;
|
||||
self.current_nodes.make_contiguous();
|
||||
let key = cn.node_ids().get(self.crypto_kind).unwrap();
|
||||
|
||||
|
@ -304,16 +304,14 @@ impl StorageManager {
|
||||
// Use the safety selection we opened the record with
|
||||
// Use the writer we opened with as the 'watcher' as well
|
||||
let opt_owvresult = self
|
||||
.outbound_watch_value(
|
||||
.outbound_watch_value_cancel(
|
||||
rpc_processor,
|
||||
key,
|
||||
ValueSubkeyRangeSet::full(),
|
||||
Timestamp::new(0),
|
||||
0,
|
||||
opened_record.safety_selection(),
|
||||
opened_record.writer().cloned(),
|
||||
Some(active_watch.id),
|
||||
Some(active_watch.watch_node),
|
||||
active_watch.id,
|
||||
active_watch.watch_node,
|
||||
)
|
||||
.await?;
|
||||
if let Some(owvresult) = opt_owvresult {
|
||||
|
@ -20,6 +20,113 @@ pub(super) struct OutboundWatchValueResult {
|
||||
}
|
||||
|
||||
impl StorageManager {
|
||||
/// Perform a 'watch value cancel' on the network without fanout
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn outbound_watch_value_cancel(
|
||||
&self,
|
||||
rpc_processor: RPCProcessor,
|
||||
key: TypedKey,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
safety_selection: SafetySelection,
|
||||
opt_watcher: Option<KeyPair>,
|
||||
watch_id: u64,
|
||||
watch_node: NodeRef,
|
||||
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||
// which lives for the duration of the app's runtime
|
||||
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||
self.unlocked_inner
|
||||
.anonymous_watch_keys
|
||||
.get(key.kind)
|
||||
.unwrap()
|
||||
.value
|
||||
});
|
||||
|
||||
let wva = VeilidAPIError::from_network_result(
|
||||
rpc_processor
|
||||
.clone()
|
||||
.rpc_call_watch_value(
|
||||
Destination::direct(watch_node.clone()).with_safety(safety_selection),
|
||||
key,
|
||||
subkeys,
|
||||
Timestamp::default(),
|
||||
0,
|
||||
watcher,
|
||||
Some(watch_id),
|
||||
)
|
||||
.await?,
|
||||
)?;
|
||||
|
||||
if wva.answer.accepted {
|
||||
Ok(Some(OutboundWatchValueResult {
|
||||
expiration_ts: wva.answer.expiration_ts,
|
||||
watch_id: wva.answer.watch_id,
|
||||
watch_node,
|
||||
opt_value_changed_route: wva.reply_private_route,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a 'watch value change' on the network without fanout
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn outbound_watch_value_change(
|
||||
&self,
|
||||
rpc_processor: RPCProcessor,
|
||||
key: TypedKey,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
safety_selection: SafetySelection,
|
||||
opt_watcher: Option<KeyPair>,
|
||||
watch_id: u64,
|
||||
watch_node: NodeRef,
|
||||
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||
if count == 0 {
|
||||
apibail_internal!("cancel should be done with outbound_watch_value_cancel");
|
||||
}
|
||||
if watch_id == 0 {
|
||||
apibail_internal!("watch id should not be zero when changing watch");
|
||||
}
|
||||
|
||||
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||
// which lives for the duration of the app's runtime
|
||||
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||
self.unlocked_inner
|
||||
.anonymous_watch_keys
|
||||
.get(key.kind)
|
||||
.unwrap()
|
||||
.value
|
||||
});
|
||||
|
||||
let wva = VeilidAPIError::from_network_result(
|
||||
rpc_processor
|
||||
.clone()
|
||||
.rpc_call_watch_value(
|
||||
Destination::direct(watch_node.clone()).with_safety(safety_selection),
|
||||
key,
|
||||
subkeys,
|
||||
expiration,
|
||||
count,
|
||||
watcher,
|
||||
Some(watch_id),
|
||||
)
|
||||
.await?,
|
||||
)?;
|
||||
|
||||
if wva.answer.accepted {
|
||||
Ok(Some(OutboundWatchValueResult {
|
||||
expiration_ts: wva.answer.expiration_ts,
|
||||
watch_id: wva.answer.watch_id,
|
||||
watch_node,
|
||||
opt_value_changed_route: wva.reply_private_route,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a 'watch value' query on the network using fanout
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn outbound_watch_value(
|
||||
@ -34,6 +141,56 @@ impl StorageManager {
|
||||
opt_watch_id: Option<u64>,
|
||||
opt_watch_node: Option<NodeRef>,
|
||||
) -> VeilidAPIResult<Option<OutboundWatchValueResult>> {
|
||||
// if the count is zero, we are cancelling
|
||||
if count == 0 {
|
||||
// Ensure watch id is specified
|
||||
let Some(watch_id) = opt_watch_id else {
|
||||
apibail_internal!("Must specify a watch id in order to cancel it");
|
||||
};
|
||||
// Ensure watch node is specified
|
||||
let Some(watch_node) = opt_watch_node else {
|
||||
apibail_internal!("Must specify a watch node in order to cancel it");
|
||||
};
|
||||
return self
|
||||
.outbound_watch_value_cancel(
|
||||
rpc_processor,
|
||||
key,
|
||||
subkeys,
|
||||
safety_selection,
|
||||
opt_watcher,
|
||||
watch_id,
|
||||
watch_node,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// if the watch id and watch node are specified, then we're trying to change an existing watch
|
||||
// first try to do that, then fall back to fanout for a new watch id
|
||||
if let Some(watch_id) = opt_watch_id {
|
||||
let Some(watch_node) = opt_watch_node else {
|
||||
apibail_internal!("Must specify a watch node in order to change it");
|
||||
};
|
||||
if let Some(res) = self
|
||||
.outbound_watch_value_change(
|
||||
rpc_processor.clone(),
|
||||
key,
|
||||
subkeys.clone(),
|
||||
expiration,
|
||||
count,
|
||||
safety_selection,
|
||||
opt_watcher,
|
||||
watch_id,
|
||||
watch_node,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
// If a change was successful then return immediately
|
||||
return Ok(Some(res));
|
||||
}
|
||||
|
||||
// Otherwise, treat this like a new watch
|
||||
}
|
||||
|
||||
let routing_table = rpc_processor.routing_table();
|
||||
|
||||
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
|
||||
@ -45,23 +202,6 @@ impl StorageManager {
|
||||
)
|
||||
};
|
||||
|
||||
// Get the nodes we know are caching this value to seed the fanout
|
||||
let init_fanout_queue = if let Some(watch_node) = opt_watch_node {
|
||||
vec![watch_node]
|
||||
} else {
|
||||
let inner = self.inner.lock().await;
|
||||
inner
|
||||
.get_value_nodes(key)?
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter(|x| {
|
||||
x.node_info(RoutingDomain::PublicInternet)
|
||||
.map(|ni| ni.has_capability(CAP_DHT_WATCH))
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||
// which lives for the duration of the app's runtime
|
||||
let watcher = opt_watcher.unwrap_or_else(|| {
|
||||
@ -72,6 +212,21 @@ impl StorageManager {
|
||||
.value
|
||||
});
|
||||
|
||||
// Get the nodes we know are caching this value to seed the fanout
|
||||
let init_fanout_queue = {
|
||||
let inner = self.inner.lock().await;
|
||||
inner
|
||||
.get_value_nodes(key)?
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter(|x| {
|
||||
x.node_info(RoutingDomain::PublicInternet)
|
||||
.map(|ni| ni.has_capabilities(&[CAP_DHT, CAP_DHT_WATCH]))
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// Make do-watch-value answer context
|
||||
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
|
||||
opt_watch_value_result: None,
|
||||
@ -82,6 +237,7 @@ impl StorageManager {
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let context = context.clone();
|
||||
let subkeys = subkeys.clone();
|
||||
|
||||
async move {
|
||||
let wva = network_result_try!(
|
||||
rpc_processor
|
||||
@ -93,27 +249,32 @@ impl StorageManager {
|
||||
expiration,
|
||||
count,
|
||||
watcher,
|
||||
opt_watch_id
|
||||
None
|
||||
)
|
||||
.await?
|
||||
);
|
||||
|
||||
// Keep answer if we got one
|
||||
// (accepted means the node could provide an answer, not that the watch is active)
|
||||
if wva.answer.accepted {
|
||||
let mut done = false;
|
||||
if wva.answer.expiration_ts.as_u64() > 0 {
|
||||
// If the expiration time is greater than zero this watch is active
|
||||
log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
|
||||
done = true;
|
||||
} else {
|
||||
// If the returned expiration time is zero, this watch was cancelled, or inactive
|
||||
log_dht!(debug "Watch inactive: id={}", wva.answer.watch_id);
|
||||
// If the returned expiration time is zero, this watch was cancelled or rejected
|
||||
// If we are asking to cancel then check_done will stop after the first node
|
||||
}
|
||||
if done {
|
||||
let mut ctx = context.lock();
|
||||
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
|
||||
expiration_ts: wva.answer.expiration_ts,
|
||||
watch_id: wva.answer.watch_id,
|
||||
watch_node: next_node.clone(),
|
||||
opt_value_changed_route: wva.reply_private_route,
|
||||
});
|
||||
}
|
||||
let mut ctx = context.lock();
|
||||
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
|
||||
expiration_ts: wva.answer.expiration_ts,
|
||||
watch_id: wva.answer.watch_id,
|
||||
watch_node: next_node.clone(),
|
||||
opt_value_changed_route: wva.reply_private_route,
|
||||
});
|
||||
}
|
||||
|
||||
// Return peers if we have some
|
||||
|
Loading…
Reference in New Issue
Block a user