more watchvalue

This commit is contained in:
Christien Rioux 2023-11-22 21:39:21 -05:00
parent 248f8dad06
commit a211c7cce3
11 changed files with 303 additions and 102 deletions

View File

@ -6,7 +6,7 @@ pub(crate) enum Destination {
/// Send to node directly
Direct {
/// The node to send to
target: NodeRef,
node: NodeRef,
/// Require safety route or not
safety_selection: SafetySelection,
},
@ -15,7 +15,7 @@ pub(crate) enum Destination {
/// The relay to send to
relay: NodeRef,
/// The final destination the relay should send to
target: NodeRef,
node: NodeRef,
/// Require safety route or not
safety_selection: SafetySelection,
},
@ -29,15 +29,15 @@ pub(crate) enum Destination {
}
impl Destination {
pub fn target(&self) -> Option<NodeRef> {
pub fn node(&self) -> Option<NodeRef> {
match self {
Destination::Direct {
target,
node: target,
safety_selection: _,
} => Some(target.clone()),
Destination::Relay {
relay: _,
target,
node: target,
safety_selection: _,
} => Some(target.clone()),
Destination::PrivateRoute {
@ -46,18 +46,18 @@ impl Destination {
} => None,
}
}
pub fn direct(target: NodeRef) -> Self {
let sequencing = target.sequencing();
pub fn direct(node: NodeRef) -> Self {
let sequencing = node.sequencing();
Self::Direct {
target,
node,
safety_selection: SafetySelection::Unsafe(sequencing),
}
}
pub fn relay(relay: NodeRef, target: NodeRef) -> Self {
let sequencing = relay.sequencing().max(target.sequencing());
pub fn relay(relay: NodeRef, node: NodeRef) -> Self {
let sequencing = relay.sequencing().max(node.sequencing());
Self::Relay {
relay,
target,
node,
safety_selection: SafetySelection::Unsafe(sequencing),
}
}
@ -71,19 +71,19 @@ impl Destination {
pub fn with_safety(self, safety_selection: SafetySelection) -> Self {
match self {
Destination::Direct {
target,
node,
safety_selection: _,
} => Self::Direct {
target,
node,
safety_selection,
},
Destination::Relay {
relay,
target,
node,
safety_selection: _,
} => Self::Relay {
relay,
target,
node,
safety_selection,
},
Destination::PrivateRoute {
@ -99,12 +99,12 @@ impl Destination {
pub fn get_safety_selection(&self) -> &SafetySelection {
match self {
Destination::Direct {
target: _,
node: _,
safety_selection,
} => safety_selection,
Destination::Relay {
relay: _,
target: _,
node: _,
safety_selection,
} => safety_selection,
Destination::PrivateRoute {
@ -113,13 +113,31 @@ impl Destination {
} => safety_selection,
}
}
pub fn get_target(&self) -> Target {
match self {
Destination::Direct {
node,
safety_selection: _,
}
| Destination::Relay {
relay: _,
node,
safety_selection: _,
} => Target::NodeId(node.best_node_id()),
Destination::PrivateRoute {
private_route,
safety_selection: _,
} => Target::PrivateRoute(private_route.public_key.value),
}
}
}
impl fmt::Display for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Destination::Direct {
target,
node,
safety_selection,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
@ -128,11 +146,11 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}{}", target, sr)
write!(f, "{}{}", node, sr)
}
Destination::Relay {
relay,
target,
node,
safety_selection,
} => {
let sr = if matches!(safety_selection, SafetySelection::Safe(_)) {
@ -141,7 +159,7 @@ impl fmt::Display for Destination {
""
};
write!(f, "{}@{}{}", target, relay, sr)
write!(f, "{}@{}{}", node, relay, sr)
}
Destination::PrivateRoute {
private_route,
@ -160,6 +178,46 @@ impl fmt::Display for Destination {
}
impl RPCProcessor {
/// Convert a 'Target' into a 'Destination'
pub async fn resolve_target_to_destination(
&self,
target: Target,
safety_selection: SafetySelection,
sequencing: Sequencing,
) -> Result<rpc_processor::Destination, RPCError> {
match target {
Target::NodeId(node_id) => {
// Resolve node
let mut nr = match self.resolve_node(node_id, safety_selection).await? {
Some(nr) => nr,
None => {
return Err(RPCError::network("could not resolve node id"));
}
};
// Apply sequencing to match safety selection
nr.set_sequencing(sequencing);
Ok(rpc_processor::Destination::Direct {
node: nr,
safety_selection,
})
}
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.routing_table().route_spec_store();
let Some(private_route) = rss.best_remote_private_route(&rsid) else {
return Err(RPCError::network("could not get remote private route"));
};
Ok(rpc_processor::Destination::PrivateRoute {
private_route,
safety_selection,
})
}
}
}
/// Convert the 'Destination' into a 'RespondTo' for a response
pub(super) fn get_destination_respond_to(
&self,
@ -170,7 +228,7 @@ impl RPCProcessor {
match dest {
Destination::Direct {
target,
node: target,
safety_selection,
} => match safety_selection {
SafetySelection::Unsafe(_) => {
@ -198,7 +256,7 @@ impl RPCProcessor {
},
Destination::Relay {
relay,
target,
node: target,
safety_selection,
} => match safety_selection {
SafetySelection::Unsafe(_) => {

View File

@ -732,12 +732,12 @@ impl RPCProcessor {
// To where are we sending the request
match dest {
Destination::Direct {
target: ref node_ref,
node: ref node_ref,
safety_selection,
}
| Destination::Relay {
relay: ref node_ref,
target: _,
node: _,
safety_selection,
} => {
// Send to a node without a private route
@ -746,7 +746,7 @@ impl RPCProcessor {
// Get the actual destination node id accounting for relays
let (node_ref, destination_node_ref) = if let Destination::Relay {
relay: _,
ref target,
node: ref target,
safety_selection: _,
} = dest
{
@ -854,12 +854,12 @@ impl RPCProcessor {
let routing_table = self.routing_table();
let target = match dest {
Destination::Direct {
target,
node: target,
safety_selection: _,
} => target.clone(),
Destination::Relay {
relay: _,
target,
node: target,
safety_selection: _,
} => target.clone(),
Destination::PrivateRoute {

View File

@ -35,7 +35,7 @@ impl RPCProcessor {
) ->RPCNetworkResult<Answer<GetValueAnswer>> {
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.target() else {
let Some(target) = dest.node() else {
return Err(RPCError::internal(
"Never send get value requests over private routes",
));

View File

@ -39,7 +39,7 @@ impl RPCProcessor {
) ->RPCNetworkResult<Answer<SetValueAnswer>> {
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.target() else {
let Some(target) = dest.node() else {
return Err(RPCError::internal(
"Never send set value requests over private routes",
));

View File

@ -27,7 +27,7 @@ impl RPCProcessor {
SafetySelection::Unsafe(_) => {
let (opt_target_nr, routing_domain) = match &dest {
Destination::Direct {
target,
node: target,
safety_selection: _,
} => {
let routing_domain = match target.best_routing_domain() {
@ -52,7 +52,7 @@ impl RPCProcessor {
}
Destination::Relay {
relay,
target,
node: target,
safety_selection: _,
} => {
let routing_domain = match relay.best_routing_domain() {
@ -147,7 +147,7 @@ impl RPCProcessor {
let mut opt_sender_info = None;
match dest {
Destination::Direct {
target,
node: target,
safety_selection,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
@ -183,7 +183,7 @@ impl RPCProcessor {
}
Destination::Relay {
relay: _,
target: _,
node: _,
safety_selection: _,
}
| Destination::PrivateRoute {

View File

@ -2,13 +2,67 @@ use super::*;
impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))]
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled
let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain());
if !opi.signed_node_info().node_info().has_capability(CAP_DHT) {
return Ok(NetworkResult::service_unavailable("dht is not available"));
// Sends a high level app message
// Can be sent via all methods including relays and routes
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err)
)]
pub async fn rpc_call_value_changed(
self,
dest: Destination,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: SignedValueData,
) -> RPCNetworkResult<()> {
let value_changed = RPCOperationValueChanged::new(key, subkeys, count, value);
let statement =
RPCStatement::new(RPCStatementDetail::ValueChanged(Box::new(value_changed)));
// Send the value changed request
self.statement(dest, statement).await
}
Err(RPCError::unimplemented("process_value_changed"))
pub(crate) async fn process_value_changed(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Get the statement
let (_, _, _, kind) = msg.operation.destructure();
let (key, subkeys, count, value) = match kind {
RPCOperationKind::Statement(s) => match s.destructure() {
RPCStatementDetail::ValueChanged(s) => s.destructure(),
_ => panic!("not a value changed statement"),
},
_ => panic!("not a statement"),
};
#[cfg(feature = "debug-dht")]
{
let debug_string_value = format!(
" len={} seq={} writer={}",
value.value_data().data().len(),
value.value_data().seq(),
value.value_data().writer(),
);
let debug_string_stmt = format!(
"IN <== ValueChanged({} #{:?}+{}{}) <= {}",
key,
subkeys,
count,
debug_string_value,
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_stmt);
}
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
storage_manager
.inbound_value_changed(key, subkeys, count, value)
.await
.map_err(RPCError::internal)?;
Ok(NetworkResult::value(()))
}
}

View File

@ -33,7 +33,7 @@ impl RPCProcessor {
) -> RPCNetworkResult<Answer<WatchValueAnswer>> {
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.target() else {
let Some(target) = dest.node() else {
return Err(RPCError::internal(
"Never send watch value requests over private routes",
));
@ -151,12 +151,101 @@ impl RPCProcessor {
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
RPCMessageHeaderDetail::PrivateRouted(_) => {
return Ok(NetworkResult::invalid_message(
"not processing watch value request over private route",
))
}
}
// Ignore if disabled
let routing_table = self.routing_table();
let opi = routing_table.get_own_peer_info(msg.header.routing_domain());
if !opi.signed_node_info().node_info().has_capability(CAP_DHT) {
return Ok(NetworkResult::service_unavailable("dht is not available"));
}
Err(RPCError::unimplemented("process_watch_value_q"))
// Get the question
let kind = msg.operation.kind().clone();
let watch_value_q = match kind {
RPCOperationKind::Question(q) => match q.destructure() {
(_, RPCQuestionDetail::WatchValueQ(q)) => q,
_ => panic!("not a watchvalue question"),
},
_ => panic!("not a question"),
};
// Destructure
let (key, subkeys, expiration, count, opt_watch_signature) = watch_value_q.destructure();
let opt_watcher = opt_watch_signature.map(|ws| ws.0);
// Get target for ValueChanged notifications
let dest = network_result_try!(self.get_respond_to_destination(&msg));
let target = dest.get_target();
#[cfg(feature = "debug-dht")]
{
let debug_string = format!(
"IN <=== WatchValueQ({} {}#{:?}@{}+{}) <== {}",
key,
if opt_watcher.is_some() { "+W " } else { "" },
subkeys,
expiration,
count,
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string);
}
// See if we have this record ourselves, if so, accept the watch
let storage_manager = self.storage_manager();
let ret_expiration = network_result_try!(storage_manager
.inbound_watch_value(
key,
subkeys,
Timestamp::new(expiration),
count,
target,
opt_watcher
)
.await
.map_err(RPCError::internal)?);
// Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table();
let closer_to_key_peers = if ret_expiration.as_u64() == 0 {
network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]))
} else {
vec![]
};
#[cfg(feature = "debug-dht")]
{
let debug_string_answer = format!(
"IN ===> WatchValueA({} #{} expiration={} peers={}) ==> {}",
key,
subkeys,
ret_expiration,
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_answer);
}
// Make WatchValue answer
let watch_value_a =
RPCOperationWatchValueA::new(ret_expiration.as_u64(), closer_to_key_peers)?;
// Send GetValue answer
self.answer(
msg,
RPCAnswer::new(RPCAnswerDetail::WatchValueA(Box::new(watch_value_a))),
)
.await
}
}

View File

@ -243,18 +243,26 @@ impl StorageManager {
want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
let mut inner = self.lock().await?;
let res = match inner
.handle_get_remote_value(key, subkey, want_descriptor)
.await
{
Ok(res) => res,
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
// See if this is a remote or local value
let (_is_local, last_subkey_result) = {
// See if the subkey we are getting has a last known local value
let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
// If this is local, it must have a descriptor already
if last_subkey_result.descriptor.is_some() {
if !want_descriptor {
last_subkey_result.descriptor = None;
}
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
(true, last_subkey_result)
} else {
// See if the subkey we are getting has a last known remote value
let last_subkey_result = inner
.handle_get_remote_value(key, subkey, want_descriptor)
.await?;
(false, last_subkey_result)
}
};
Ok(NetworkResult::value(res))
Ok(NetworkResult::value(last_subkey_result))
}
}

View File

@ -172,21 +172,39 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
// xxx more here
) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
target: Target,
opt_watcher: Option<CryptoKey>,
) -> VeilidAPIResult<NetworkResult<Timestamp>> {
let mut inner = self.lock().await?;
let res = match inner
.handle_watch_remote_value(key, subkeys, expiration, count)
.await
{
Ok(res) => res,
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
}
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
// See if this is a remote or local value
let (_is_local, opt_expiration_ts) = {
// See if the subkey we are watching has a local value
let opt_expiration_ts = inner
.handle_watch_local_value(key, subkeys, expiration, count, target, opt_watcher)
.await?;
if opt_expiration_ts.is_some() {
(true, opt_expiration_ts)
} else {
// See if the subkey we are watching is a remote value
let opt_expiration_ts = inner
.handle_watch_remote_value(key, subkeys, expiration, count, target, opt_watcher)
.await?;
(false, opt_expiration_ts)
}
};
Ok(NetworkResult::value(res))
Ok(NetworkResult::value(opt_expiration_ts.unwrap_or_default()))
}
/// Handle a received 'Value Changed' statement
pub async fn inbound_value_changed(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: SignedValueData,
) -> VeilidAPIResult<()> {
//
}
}

View File

@ -888,7 +888,7 @@ impl VeilidAPI {
match &dest {
Destination::Direct {
target,
node: target,
safety_selection: _,
} => Ok(format!(
"Destination: {:#?}\nTarget Entry:\n{}\n",
@ -897,7 +897,7 @@ impl VeilidAPI {
)),
Destination::Relay {
relay,
target,
node: target,
safety_selection: _,
} => Ok(format!(
"Destination: {:#?}\nTarget Entry:\n{}\nRelay Entry:\n{}\n",

View File

@ -123,40 +123,14 @@ impl RoutingContext {
async fn get_destination(&self, target: Target) -> VeilidAPIResult<rpc_processor::Destination> {
let rpc_processor = self.api.rpc_processor()?;
match target {
Target::NodeId(node_id) => {
// Resolve node
let mut nr = match rpc_processor
.resolve_node(node_id, self.unlocked_inner.safety_selection)
rpc_processor
.resolve_target_to_destination(
target,
self.unlocked_inner.safety_selection,
self.sequencing(),
)
.await
{
Ok(Some(nr)) => nr,
Ok(None) => apibail_invalid_target!("could not resolve node id"),
Err(e) => return Err(e.into()),
};
// Apply sequencing to match safety selection
nr.set_sequencing(self.sequencing());
Ok(rpc_processor::Destination::Direct {
target: nr,
safety_selection: self.unlocked_inner.safety_selection,
})
}
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.api.routing_table()?.route_spec_store();
let Some(private_route) = rss.best_remote_private_route(&rsid) else {
apibail_invalid_target!("could not get remote private route");
};
Ok(rpc_processor::Destination::PrivateRoute {
private_route,
safety_selection: self.unlocked_inner.safety_selection,
})
}
}
.map_err(VeilidAPIError::invalid_target)
}
////////////////////////////////////////////////////////////////