more watch value

This commit is contained in:
Christien Rioux 2023-11-20 08:58:14 -05:00
parent 70e256a25a
commit 9d9a76e45c
2 changed files with 83 additions and 170 deletions

View File

@ -2,7 +2,8 @@ use super::*;
#[derive(Clone, Debug)]
pub struct WatchValueAnswer {
pub expiration_ts: Option<Timestamp>,
pub expiration_ts: Timestamp,
pub peers: Vec<PeerInfo>,
}
impl RPCProcessor {
@ -16,8 +17,9 @@ impl RPCProcessor {
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self),
fields(ret.expiration_ts,
ret.latency
fields(ret.expiration,
ret.latency,
ret.peers.len
),err)
)]
pub async fn rpc_call_watch_value(
@ -62,7 +64,7 @@ impl RPCProcessor {
expiration.as_u64(),
count,
opt_watcher,
vcrypto,
vcrypto.clone(),
)?;
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
@ -74,7 +76,7 @@ impl RPCProcessor {
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?);
xxxxx continue here
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
@ -83,36 +85,24 @@ xxxxx continue here
// Get the right answer type
let (_, _, _, kind) = msg.operation.destructure();
let get_value_a = match kind {
let watch_value_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::GetValueA(a) => a,
_ => return Ok(NetworkResult::invalid_message("not a getvalue answer")),
RPCAnswerDetail::WatchValueA(a) => a,
_ => return Ok(NetworkResult::invalid_message("not a watchvalue answer")),
},
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (value, peers, descriptor) = get_value_a.destructure();
let (expiration, peers) = watch_value_a.destructure();
#[cfg(feature = "debug-dht")]
{
let debug_string_value = value
.as_ref()
.map(|v| {
format!(
" len={} seq={} writer={}",
v.value_data().data().len(),
v.value_data().seq(),
v.value_data().writer(),
)
})
.unwrap_or_default();
let debug_string_answer = format!(
"OUT <== GetValueA({} #{}{}{} peers={}) <= {}",
"OUT <== WatchValueA({} {}#{:?}@{} peers={}) <= {}",
key,
subkey,
debug_string_value,
if descriptor.is_some() { " +desc" } else { "" },
peers.len(),
if opt_watcher.is_some() { "+W " } else { "" },
subkeys,
expiration,
peer.len()
dest
);
@ -142,23 +132,15 @@ xxxxx continue here
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.latency", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
if let Some(value) = &value {
tracing::Span::current().record("ret.value.data.len", value.value_data().data().len());
tracing::Span::current().record("ret.value.data.seq", value.value_data().seq());
tracing::Span::current().record(
"ret.value.data.writer",
value.value_data().writer().to_string(),
);
}
tracing::Span::current().record("ret.expiration", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.peers.len", peers.len());
Ok(NetworkResult::value(Answer::new(
latency,
GetValueAnswer {
value,
WatchValueAnswer {
expiration_ts: Timestamp::new(expiration),
peers,
descriptor,
},
)))
}

View File

@ -7,7 +7,6 @@ struct OutboundWatchValueContext {
}
impl StorageManager {
/// Perform a 'watch value' query on the network
pub async fn outbound_watch_value(
&self,
@ -17,26 +16,25 @@ impl StorageManager {
expiration: Timestamp,
count: u32,
safety_selection: SafetySelection,
opt_writer: Option<KeyPair>,
opt_watcher: Option<KeyPair>,
) -> VeilidAPIResult<Timestamp> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue', some of which are the same for 'WatchValue' operations
let (key_count, fanout, timeout_us) = {
// Get the DHT parameters for 'WatchValue', some of which are the same for 'WatchValue' operations
let (key_count, timeout_us, rpc_timeout_us) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
)
};
// Get the minimum expiration timestamp we will accept
let cur_ts = get_timestamp();
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
// Make do-watch-value answer context
let schema = if let Some(d) = &last_subkey_result.descriptor {
Some(d.schema()?)
} else {
None
};
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
opt_expiration_ts: None,
}));
@ -45,112 +43,54 @@ impl StorageManager {
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
let context = context.clone();
let last_descriptor = last_subkey_result.descriptor.clone();
let subkeys = subkeys.clone();
async move {
let gva = network_result_try!(
let wva = network_result_try!(
rpc_processor
.clone()
.rpc_call_watch_value(
Destination::direct(next_node.clone()).with_safety(safety_selection),
key,
subkey,
last_descriptor,
subkeys,
expiration,
count,
opt_watcher
)
.await?
);
// Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value
if let Some(descriptor) = gva.answer.descriptor {
// Keep the expiration_ts if we got one
if wva.answer.expiration_ts.as_u64() >= min_expiration_ts {
log_stor!(debug "Got expiration back: expiration_ts={}", wva.answer.expiration_ts);
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(descriptor);
}
}
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(NetworkResult::invalid_message(
"Got value with no descriptor",
));
};
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(
"Schema validation failed on subkey {}",
subkey
)));
}
// If we have a prior value, see if this is a newer sequence number
if let Some(prior_value) = &ctx.value {
let prior_seq = prior_value.value_data().seq();
let new_seq = value.value_data().seq();
if new_seq == prior_seq {
// If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() {
// Move to the next node
return Ok(NetworkResult::invalid_message("value data mismatch"));
}
// Increase the consensus count for the existing value
ctx.value_count += 1;
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
} else {
// If we have no prior value, keep it
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
}
ctx.opt_expiration_ts = Some(wva.answer.expiration_ts);
}
// Return peers if we have some
#[cfg(feature = "network-result-extra")]
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
log_stor!(debug "WatchValue fanout call returned peers {}", wva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers))
Ok(NetworkResult::value(wva.answer.peers))
}
};
// Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
// If a watch has succeeded, return done
let ctx = context.lock();
if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count
{
if ctx.opt_expiration_ts.is_some() {
return Some(());
}
None
};
// Call the fanout
// Use a fixed fanout concurrency of 1 because we only want one watch
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
key_count,
fanout,
1,
timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine,
@ -158,79 +98,70 @@ impl StorageManager {
);
match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus)
// If we don't finish in the timeout (too much time passed without a successful watch)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
if ctx.opt_expiration_ts.is_some() {
log_stor!(debug "WatchValue Fanout Timeout Success");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count);
log_stor!(debug "WatchValue Fanout Timeout Failure");
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
Ok(ctx.opt_expiration_ts.unwrap_or_default())
}
// If we finished with consensus (enough nodes returning the same value)
// If we finished with done
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus");
if ctx.opt_expiration_ts.is_some() {
log_stor!(debug "WatchValue Fanout Success");
} else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count);
log_stor!(debug "WatchValue Fanout Failure");
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
Ok(ctx.opt_expiration_ts.unwrap_or_default())
}
// If we finished without consensus (ran out of nodes before getting consensus)
// If we ran out of nodes
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus");
if ctx.opt_expiration_ts.is_some() {
log_stor!(debug "WatchValue Fanout Exhausted Success");
} else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count);
log_stor!(debug "WatchValue Fanout Exhausted Failure");
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
Ok(ctx.opt_expiration_ts.unwrap_or_default())
}
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "GetValue Fanout Error: {}", e);
log_stor!(debug "WatchValue Fanout Error: {}", e);
Err(e.into())
}
}
}
/// Handle a received 'Get Value' query
// pub async fn inbound_get_value(
// &self,
// key: TypedKey,
// subkey: ValueSubkey,
// want_descriptor: bool,
// ) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
// let mut inner = self.lock().await?;
// let res = match inner
// .handle_get_remote_value(key, subkey, want_descriptor)
// .await
// {
// Ok(res) => res,
// Err(VeilidAPIError::Internal { message }) => {
// apibail_internal!(message);
// }
// Err(e) => {
// return Ok(NetworkResult::invalid_message(e));
// }
// };
// Ok(NetworkResult::value(res))
// }
/// Handle a received 'Watch Value' query
pub async fn inbound_watch_value(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
// xxx more here
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
let mut inner = self.lock().await?;
let res = match inner
.handle_watch_remote_value(key, subkeys, expiration, count)
.await
{
Ok(res) => res,
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
}
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
}
};
Ok(NetworkResult::value(res))
}
}