From ca65d12836e1293fbd3932fe7b3c98c2507989d5 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 8 Mar 2024 21:24:59 -0500 Subject: [PATCH] checkpoint, adding 'inspect value' --- veilid-core/proto/veilid.capnp | 35 +- veilid-core/proto/veilid_capnp.rs | 788 ++++++++++++++++-- veilid-core/src/rpc_processor/coders/mod.rs | 1 + .../rpc_processor/coders/operations/answer.rs | 11 + .../rpc_processor/coders/operations/mod.rs | 2 + .../coders/operations/operation_get_value.rs | 35 +- .../operations/operation_inspect_value.rs | 291 +++++++ .../operations/operation_value_changed.rs | 16 +- .../operations/operation_watch_value.rs | 16 +- .../coders/operations/question.rs | 11 + veilid-core/src/rpc_processor/mod.rs | 2 + .../src/rpc_processor/rpc_inspect_value.rs | 266 ++++++ .../src/storage_manager/inspect_value.rs | 267 ++++++ 13 files changed, 1641 insertions(+), 100 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs create mode 100644 veilid-core/src/rpc_processor/rpc_inspect_value.rs create mode 100644 veilid-core/src/storage_manager/inspect_value.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 46dc4f33..9ea6dea3 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -368,6 +368,18 @@ struct OperationWatchValueA @0xa726cab7064ba893 { watchId @3 :UInt64; # (0 = id not allocated if rejecting new watch) random id for watch instance on this node } +struct OperationInspectValueQ @0xdef712d2fd16f55a { + key @0 :TypedKey; # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] + subkeys @1 :List(SubkeyRange); # subkey range to inspect (up to 512 total subkeys), if empty this implies 0..=511 + wantDescriptor @2 :Bool; # whether or not to include the descriptor for the key +} + +struct OperationInspectValueA @0xb8b57faf960ee102 { + seqs @0 :List(ValueSeqNum); # the list of subkey value sequence numbers in ascending order for each subkey in the requested range. if a subkey has not been written to, it is given a value of UINT32_MAX. these are not signed, and may be immediately out of date, and must be verified by a GetValueQ request. + peers @1 :List(PeerInfo); # returned 'closer peer' information on either success or failure + descriptor @2 :SignedValueDescriptor; # optional: the descriptor if requested if the value is also returned +} + struct OperationValueChanged @0xd1c59ebdd8cc1bf6 { key @0 :TypedKey; # key for value that changed subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time, if empty this is a watch expiration notice) @@ -487,15 +499,17 @@ struct Question @0xd8510bc33492ef70 { getValueQ @5 :OperationGetValueQ; setValueQ @6 :OperationSetValueQ; watchValueQ @7 :OperationWatchValueQ; + inspectValueQ @8 :OperationInspectValueQ; + # #[cfg(feature="unstable-blockstore")] - # supplyBlockQ @8 :OperationSupplyBlockQ; - # findBlockQ @9 :OperationFindBlockQ; + # supplyBlockQ @9 :OperationSupplyBlockQ; + # findBlockQ @10 :OperationFindBlockQ; # Tunnel operations # #[cfg(feature="unstable-tunnels")] - # startTunnelQ @10 :OperationStartTunnelQ; - # completeTunnelQ @11 :OperationCompleteTunnelQ; - # cancelTunnelQ @12 :OperationCancelTunnelQ; + # startTunnelQ @11 :OperationStartTunnelQ; + # completeTunnelQ @12 :OperationCompleteTunnelQ; + # cancelTunnelQ @13 :OperationCancelTunnelQ; } } @@ -526,16 +540,17 @@ struct Answer @0xacacb8b6988c1058 { getValueA @3 :OperationGetValueA; setValueA @4 :OperationSetValueA; watchValueA @5 :OperationWatchValueA; + inspectValueA @6 :OperationInspectValueA; # #[cfg(feature="unstable-blockstore")] - #supplyBlockA @6 :OperationSupplyBlockA; - #findBlockA @7 :OperationFindBlockA; + #supplyBlockA @7 :OperationSupplyBlockA; + #findBlockA @8 :OperationFindBlockA; # Tunnel operations # #[cfg(feature="unstable-tunnels")] - # startTunnelA @8 :OperationStartTunnelA; - # completeTunnelA @9 :OperationCompleteTunnelA; - # cancelTunnelA @10 :OperationCancelTunnelA; + # startTunnelA @9 :OperationStartTunnelA; + # completeTunnelA @10 :OperationCompleteTunnelA; + # cancelTunnelA @11 :OperationCancelTunnelA; } } diff --git a/veilid-core/proto/veilid_capnp.rs b/veilid-core/proto/veilid_capnp.rs index 5a390421..64ed7421 100644 --- a/veilid-core/proto/veilid_capnp.rs +++ b/veilid-core/proto/veilid_capnp.rs @@ -14457,6 +14457,588 @@ pub mod operation_watch_value_a { } } +pub mod operation_inspect_value_q { + #[derive(Copy, Clone)] + pub struct Owned(()); + impl ::capnp::introspect::Introspect for Owned { fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Struct(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types, annotation_types: _private::get_annotation_types }).into() } } + impl ::capnp::traits::Owned for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::OwnedStruct for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::Pipelined for Owned { type Pipeline = Pipeline; } + + pub struct Reader<'a> { reader: ::capnp::private::layout::StructReader<'a> } + impl <'a,> ::core::marker::Copy for Reader<'a,> {} + impl <'a,> ::core::clone::Clone for Reader<'a,> { + fn clone(&self) -> Self { *self } + } + + impl <'a,> ::capnp::traits::HasTypeId for Reader<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructReader<'a>> for Reader<'a,> { + fn from(reader: ::capnp::private::layout::StructReader<'a>) -> Self { + Self { reader, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Reader<'a> { + fn from(reader: Reader<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Reader::new(reader.reader, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::core::fmt::Debug for Reader<'a,> { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::result::Result<(), ::core::fmt::Error> { + core::fmt::Debug::fmt(&::core::convert::Into::<::capnp::dynamic_value::Reader<'_>>::into(*self), f) + } + } + + impl <'a,> ::capnp::traits::FromPointerReader<'a> for Reader<'a,> { + fn get_from_pointer(reader: &::capnp::private::layout::PointerReader<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(reader.get_struct(default)?.into()) + } + } + + impl <'a,> ::capnp::traits::IntoInternalStructReader<'a> for Reader<'a,> { + fn into_internal_struct_reader(self) -> ::capnp::private::layout::StructReader<'a> { + self.reader + } + } + + impl <'a,> ::capnp::traits::Imbue<'a> for Reader<'a,> { + fn imbue(&mut self, cap_table: &'a ::capnp::private::layout::CapTable) { + self.reader.imbue(::capnp::private::layout::CapTableReader::Plain(cap_table)) + } + } + + impl <'a,> Reader<'a,> { + pub fn reborrow(&self) -> Reader<'_,> { + Self { .. *self } + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.reader.total_size() + } + #[inline] + pub fn get_key(self) -> ::capnp::Result> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn has_key(&self) -> bool { + !self.reader.get_pointer_field(0).is_null() + } + #[inline] + pub fn get_subkeys(self) -> ::capnp::Result<::capnp::struct_list::Reader<'a,crate::veilid_capnp::subkey_range::Owned>> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(1), ::core::option::Option::None) + } + #[inline] + pub fn has_subkeys(&self) -> bool { + !self.reader.get_pointer_field(1).is_null() + } + #[inline] + pub fn get_want_descriptor(self) -> bool { + self.reader.get_bool_field(0) + } + } + + pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } + impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 2 }; + } + impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructBuilder<'a>> for Builder<'a,> { + fn from(builder: ::capnp::private::layout::StructBuilder<'a>) -> Self { + Self { builder, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Builder<'a> { + fn from(builder: Builder<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Builder::new(builder.builder, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::capnp::traits::ImbueMut<'a> for Builder<'a,> { + fn imbue_mut(&mut self, cap_table: &'a mut ::capnp::private::layout::CapTable) { + self.builder.imbue(::capnp::private::layout::CapTableBuilder::Plain(cap_table)) + } + } + + impl <'a,> ::capnp::traits::FromPointerBuilder<'a> for Builder<'a,> { + fn init_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, _size: u32) -> Self { + builder.init_struct(::STRUCT_SIZE).into() + } + fn get_from_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(builder.get_struct(::STRUCT_SIZE, default)?.into()) + } + } + + impl <'a,> ::capnp::traits::SetPointerBuilder for Reader<'a,> { + fn set_pointer_builder(mut pointer: ::capnp::private::layout::PointerBuilder<'_>, value: Self, canonicalize: bool) -> ::capnp::Result<()> { pointer.set_struct(&value.reader, canonicalize) } + } + + impl <'a,> Builder<'a,> { + pub fn into_reader(self) -> Reader<'a,> { + self.builder.into_reader().into() + } + pub fn reborrow(&mut self) -> Builder<'_,> { + Builder { builder: self.builder.reborrow() } + } + pub fn reborrow_as_reader(&self) -> Reader<'_,> { + self.builder.as_reader().into() + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.builder.as_reader().total_size() + } + #[inline] + pub fn get_key(self) -> ::capnp::Result> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn set_key(&mut self, value: crate::veilid_capnp::typed_key::Reader<'_>) -> ::capnp::Result<()> { + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) + } + #[inline] + pub fn init_key(self, ) -> crate::veilid_capnp::typed_key::Builder<'a> { + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), 0) + } + #[inline] + pub fn has_key(&self) -> bool { + !self.builder.is_pointer_field_null(0) + } + #[inline] + pub fn get_subkeys(self) -> ::capnp::Result<::capnp::struct_list::Builder<'a,crate::veilid_capnp::subkey_range::Owned>> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(1), ::core::option::Option::None) + } + #[inline] + pub fn set_subkeys(&mut self, value: ::capnp::struct_list::Reader<'_,crate::veilid_capnp::subkey_range::Owned>) -> ::capnp::Result<()> { + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(1), value, false) + } + #[inline] + pub fn init_subkeys(self, size: u32) -> ::capnp::struct_list::Builder<'a,crate::veilid_capnp::subkey_range::Owned> { + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(1), size) + } + #[inline] + pub fn has_subkeys(&self) -> bool { + !self.builder.is_pointer_field_null(1) + } + #[inline] + pub fn get_want_descriptor(self) -> bool { + self.builder.get_bool_field(0) + } + #[inline] + pub fn set_want_descriptor(&mut self, value: bool) { + self.builder.set_bool_field(0, value); + } + } + + pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline } + impl ::capnp::capability::FromTypelessPipeline for Pipeline { + fn new(typeless: ::capnp::any_pointer::Pipeline) -> Self { + Self { _typeless: typeless, } + } + } + impl Pipeline { + pub fn get_key(&self) -> crate::veilid_capnp::typed_key::Pipeline { + ::capnp::capability::FromTypelessPipeline::new(self._typeless.get_pointer_field(0)) + } + } + mod _private { + pub static ENCODED_NODE: [::capnp::Word; 70] = [ + ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), + ::capnp::word(90, 245, 22, 253, 210, 18, 247, 222), + ::capnp::word(19, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), + ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), + ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(37, 0, 0, 0, 175, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), + ::capnp::word(105, 108, 105, 100, 46, 99, 97, 112), + ::capnp::word(110, 112, 58, 79, 112, 101, 114, 97), + ::capnp::word(116, 105, 111, 110, 73, 110, 115, 112), + ::capnp::word(101, 99, 116, 86, 97, 108, 117, 101), + ::capnp::word(81, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(12, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(69, 0, 0, 0, 34, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(64, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(76, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(73, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(68, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(96, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(93, 0, 0, 0, 122, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(92, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(104, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(107, 101, 121, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(41, 27, 230, 241, 169, 103, 213, 226), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(115, 117, 98, 107, 101, 121, 115, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(28, 23, 208, 164, 192, 218, 146, 245), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(119, 97, 110, 116, 68, 101, 115, 99), + ::capnp::word(114, 105, 112, 116, 111, 114, 0, 0), + ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ]; + pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { + match index { + 0 => ::introspect(), + 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 2 => ::introspect(), + _ => panic!("invalid field index {}", index), + } + } + pub fn get_annotation_types(child_index: Option, index: u32) -> ::capnp::introspect::Type { + panic!("invalid annotation indices ({:?}, {}) ", child_index, index) + } + pub static RAW_SCHEMA: ::capnp::introspect::RawStructSchema = ::capnp::introspect::RawStructSchema { + encoded_node: &ENCODED_NODE, + nonunion_members: NONUNION_MEMBERS, + members_by_discriminant: MEMBERS_BY_DISCRIMINANT, + }; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2]; + pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; + pub const TYPE_ID: u64 = 0xdef7_12d2_fd16_f55a; + } +} + +pub mod operation_inspect_value_a { + #[derive(Copy, Clone)] + pub struct Owned(()); + impl ::capnp::introspect::Introspect for Owned { fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Struct(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types, annotation_types: _private::get_annotation_types }).into() } } + impl ::capnp::traits::Owned for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::OwnedStruct for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::Pipelined for Owned { type Pipeline = Pipeline; } + + pub struct Reader<'a> { reader: ::capnp::private::layout::StructReader<'a> } + impl <'a,> ::core::marker::Copy for Reader<'a,> {} + impl <'a,> ::core::clone::Clone for Reader<'a,> { + fn clone(&self) -> Self { *self } + } + + impl <'a,> ::capnp::traits::HasTypeId for Reader<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructReader<'a>> for Reader<'a,> { + fn from(reader: ::capnp::private::layout::StructReader<'a>) -> Self { + Self { reader, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Reader<'a> { + fn from(reader: Reader<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Reader::new(reader.reader, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::core::fmt::Debug for Reader<'a,> { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::result::Result<(), ::core::fmt::Error> { + core::fmt::Debug::fmt(&::core::convert::Into::<::capnp::dynamic_value::Reader<'_>>::into(*self), f) + } + } + + impl <'a,> ::capnp::traits::FromPointerReader<'a> for Reader<'a,> { + fn get_from_pointer(reader: &::capnp::private::layout::PointerReader<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(reader.get_struct(default)?.into()) + } + } + + impl <'a,> ::capnp::traits::IntoInternalStructReader<'a> for Reader<'a,> { + fn into_internal_struct_reader(self) -> ::capnp::private::layout::StructReader<'a> { + self.reader + } + } + + impl <'a,> ::capnp::traits::Imbue<'a> for Reader<'a,> { + fn imbue(&mut self, cap_table: &'a ::capnp::private::layout::CapTable) { + self.reader.imbue(::capnp::private::layout::CapTableReader::Plain(cap_table)) + } + } + + impl <'a,> Reader<'a,> { + pub fn reborrow(&self) -> Reader<'_,> { + Self { .. *self } + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.reader.total_size() + } + #[inline] + pub fn get_seqs(self) -> ::capnp::Result<::capnp::primitive_list::Reader<'a,u32>> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn has_seqs(&self) -> bool { + !self.reader.get_pointer_field(0).is_null() + } + #[inline] + pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Reader<'a,crate::veilid_capnp::peer_info::Owned>> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(1), ::core::option::Option::None) + } + #[inline] + pub fn has_peers(&self) -> bool { + !self.reader.get_pointer_field(1).is_null() + } + #[inline] + pub fn get_descriptor(self) -> ::capnp::Result> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(2), ::core::option::Option::None) + } + #[inline] + pub fn has_descriptor(&self) -> bool { + !self.reader.get_pointer_field(2).is_null() + } + } + + pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } + impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 0, pointers: 3 }; + } + impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructBuilder<'a>> for Builder<'a,> { + fn from(builder: ::capnp::private::layout::StructBuilder<'a>) -> Self { + Self { builder, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Builder<'a> { + fn from(builder: Builder<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Builder::new(builder.builder, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::capnp::traits::ImbueMut<'a> for Builder<'a,> { + fn imbue_mut(&mut self, cap_table: &'a mut ::capnp::private::layout::CapTable) { + self.builder.imbue(::capnp::private::layout::CapTableBuilder::Plain(cap_table)) + } + } + + impl <'a,> ::capnp::traits::FromPointerBuilder<'a> for Builder<'a,> { + fn init_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, _size: u32) -> Self { + builder.init_struct(::STRUCT_SIZE).into() + } + fn get_from_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(builder.get_struct(::STRUCT_SIZE, default)?.into()) + } + } + + impl <'a,> ::capnp::traits::SetPointerBuilder for Reader<'a,> { + fn set_pointer_builder(mut pointer: ::capnp::private::layout::PointerBuilder<'_>, value: Self, canonicalize: bool) -> ::capnp::Result<()> { pointer.set_struct(&value.reader, canonicalize) } + } + + impl <'a,> Builder<'a,> { + pub fn into_reader(self) -> Reader<'a,> { + self.builder.into_reader().into() + } + pub fn reborrow(&mut self) -> Builder<'_,> { + Builder { builder: self.builder.reborrow() } + } + pub fn reborrow_as_reader(&self) -> Reader<'_,> { + self.builder.as_reader().into() + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.builder.as_reader().total_size() + } + #[inline] + pub fn get_seqs(self) -> ::capnp::Result<::capnp::primitive_list::Builder<'a,u32>> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn set_seqs(&mut self, value: ::capnp::primitive_list::Reader<'_,u32>) -> ::capnp::Result<()> { + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) + } + #[inline] + pub fn init_seqs(self, size: u32) -> ::capnp::primitive_list::Builder<'a,u32> { + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), size) + } + #[inline] + pub fn has_seqs(&self) -> bool { + !self.builder.is_pointer_field_null(0) + } + #[inline] + pub fn get_peers(self) -> ::capnp::Result<::capnp::struct_list::Builder<'a,crate::veilid_capnp::peer_info::Owned>> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(1), ::core::option::Option::None) + } + #[inline] + pub fn set_peers(&mut self, value: ::capnp::struct_list::Reader<'_,crate::veilid_capnp::peer_info::Owned>) -> ::capnp::Result<()> { + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(1), value, false) + } + #[inline] + pub fn init_peers(self, size: u32) -> ::capnp::struct_list::Builder<'a,crate::veilid_capnp::peer_info::Owned> { + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(1), size) + } + #[inline] + pub fn has_peers(&self) -> bool { + !self.builder.is_pointer_field_null(1) + } + #[inline] + pub fn get_descriptor(self) -> ::capnp::Result> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(2), ::core::option::Option::None) + } + #[inline] + pub fn set_descriptor(&mut self, value: crate::veilid_capnp::signed_value_descriptor::Reader<'_>) -> ::capnp::Result<()> { + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(2), value, false) + } + #[inline] + pub fn init_descriptor(self, ) -> crate::veilid_capnp::signed_value_descriptor::Builder<'a> { + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(2), 0) + } + #[inline] + pub fn has_descriptor(&self) -> bool { + !self.builder.is_pointer_field_null(2) + } + } + + pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline } + impl ::capnp::capability::FromTypelessPipeline for Pipeline { + fn new(typeless: ::capnp::any_pointer::Pipeline) -> Self { + Self { _typeless: typeless, } + } + } + impl Pipeline { + pub fn get_descriptor(&self) -> crate::veilid_capnp::signed_value_descriptor::Pipeline { + ::capnp::capability::FromTypelessPipeline::new(self._typeless.get_pointer_field(2)) + } + } + mod _private { + pub static ENCODED_NODE: [::capnp::Word; 74] = [ + ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), + ::capnp::word(2, 225, 14, 150, 175, 127, 181, 184), + ::capnp::word(19, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(2, 171, 52, 55, 3, 232, 252, 143), + ::capnp::word(3, 0, 7, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), + ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(37, 0, 0, 0, 175, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), + ::capnp::word(105, 108, 105, 100, 46, 99, 97, 112), + ::capnp::word(110, 112, 58, 79, 112, 101, 114, 97), + ::capnp::word(116, 105, 111, 110, 73, 110, 115, 112), + ::capnp::word(101, 99, 116, 86, 97, 108, 117, 101), + ::capnp::word(65, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(12, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(69, 0, 0, 0, 42, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(64, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(92, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(1, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(89, 0, 0, 0, 50, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(84, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(112, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(2, 0, 0, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(109, 0, 0, 0, 90, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(108, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(120, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(115, 101, 113, 115, 0, 0, 0, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(8, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(112, 101, 101, 114, 115, 0, 0, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(203, 75, 60, 93, 45, 114, 45, 254), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(100, 101, 115, 99, 114, 105, 112, 116), + ::capnp::word(111, 114, 0, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(231, 176, 225, 249, 211, 28, 145, 231), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ]; + pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { + match index { + 0 => <::capnp::primitive_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 1 => <::capnp::struct_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 2 => ::introspect(), + _ => panic!("invalid field index {}", index), + } + } + pub fn get_annotation_types(child_index: Option, index: u32) -> ::capnp::introspect::Type { + panic!("invalid annotation indices ({:?}, {}) ", child_index, index) + } + pub static RAW_SCHEMA: ::capnp::introspect::RawStructSchema = ::capnp::introspect::RawStructSchema { + encoded_node: &ENCODED_NODE, + nonunion_members: NONUNION_MEMBERS, + members_by_discriminant: MEMBERS_BY_DISCRIMINANT, + }; + pub static NONUNION_MEMBERS : &[u16] = &[0,1,2]; + pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; + pub const TYPE_ID: u64 = 0xb8b5_7faf_960e_e102; + } +} + pub mod operation_value_changed { #[derive(Copy, Clone)] pub struct Owned(()); @@ -18995,7 +19577,7 @@ pub mod question { } pub mod detail { - pub use self::Which::{StatusQ,FindNodeQ,AppCallQ,GetValueQ,SetValueQ,WatchValueQ}; + pub use self::Which::{StatusQ,FindNodeQ,AppCallQ,GetValueQ,SetValueQ,WatchValueQ,InspectValueQ}; #[derive(Copy, Clone)] pub struct Owned(()); @@ -19088,6 +19670,11 @@ pub mod question { !self.reader.get_pointer_field(1).is_null() } #[inline] + pub fn has_inspect_value_q(&self) -> bool { + if self.reader.get_data_field::(1) != 6 { return false; } + !self.reader.get_pointer_field(1).is_null() + } + #[inline] pub fn which(self) -> ::core::result::Result, ::capnp::NotInSchema> { match self.reader.get_data_field::(1) { 0 => { @@ -19120,6 +19707,11 @@ pub mod question { ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(1), ::core::option::Option::None) )) } + 6 => { + ::core::result::Result::Ok(InspectValueQ( + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(1), ::core::option::Option::None) + )) + } x => ::core::result::Result::Err(::capnp::NotInSchema(x)) } } @@ -19268,6 +19860,21 @@ pub mod question { !self.builder.is_pointer_field_null(1) } #[inline] + pub fn set_inspect_value_q(&mut self, value: crate::veilid_capnp::operation_inspect_value_q::Reader<'_>) -> ::capnp::Result<()> { + self.builder.set_data_field::(1, 6); + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(1), value, false) + } + #[inline] + pub fn init_inspect_value_q(self, ) -> crate::veilid_capnp::operation_inspect_value_q::Builder<'a> { + self.builder.set_data_field::(1, 6); + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(1), 0) + } + #[inline] + pub fn has_inspect_value_q(&self) -> bool { + if self.builder.get_data_field::(1) != 6 { return false; } + !self.builder.is_pointer_field_null(1) + } + #[inline] pub fn which(self) -> ::core::result::Result, ::capnp::NotInSchema> { match self.builder.get_data_field::(1) { 0 => { @@ -19300,6 +19907,11 @@ pub mod question { ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(1), ::core::option::Option::None) )) } + 6 => { + ::core::result::Result::Ok(InspectValueQ( + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(1), ::core::option::Option::None) + )) + } x => ::core::result::Result::Err(::capnp::NotInSchema(x)) } } @@ -19314,17 +19926,17 @@ pub mod question { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 113] = [ + pub static ENCODED_NODE: [::capnp::Word; 129] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(178, 131, 145, 42, 21, 110, 131, 223), ::capnp::word(28, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(112, 239, 146, 52, 195, 11, 81, 216), - ::capnp::word(2, 0, 7, 0, 1, 0, 6, 0), + ::capnp::word(2, 0, 7, 0, 1, 0, 7, 0), ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 26, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 87, 1, 0, 0), + ::capnp::word(29, 0, 0, 0, 143, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -19332,49 +19944,56 @@ pub mod question { ::capnp::word(110, 112, 58, 81, 117, 101, 115, 116), ::capnp::word(105, 111, 110, 46, 100, 101, 116, 97), ::capnp::word(105, 108, 0, 0, 0, 0, 0, 0), - ::capnp::word(24, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(28, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 255, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(153, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(181, 0, 0, 0, 66, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(148, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(160, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(176, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(188, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(1, 0, 254, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(157, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(185, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(156, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(168, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(184, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(196, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(2, 0, 253, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 4, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(165, 0, 0, 0, 74, 0, 0, 0), + ::capnp::word(193, 0, 0, 0, 74, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(164, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(176, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(192, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(204, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(3, 0, 252, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 5, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(173, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(201, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(172, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(184, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(200, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(212, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(4, 0, 251, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 6, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(181, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(209, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(180, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(192, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(208, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(220, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(5, 0, 250, 255, 1, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(189, 0, 0, 0, 98, 0, 0, 0), + ::capnp::word(217, 0, 0, 0, 98, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(188, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(200, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(216, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(228, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(6, 0, 249, 255, 1, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 8, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(225, 0, 0, 0, 114, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(224, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(236, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(115, 116, 97, 116, 117, 115, 81, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(74, 136, 13, 167, 206, 128, 93, 134), @@ -19428,6 +20047,15 @@ pub mod question { ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(105, 110, 115, 112, 101, 99, 116, 86), + ::capnp::word(97, 108, 117, 101, 81, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(90, 245, 22, 253, 210, 18, 247, 222), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ]; pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { match index { @@ -19437,6 +20065,7 @@ pub mod question { 3 => ::introspect(), 4 => ::introspect(), 5 => ::introspect(), + 6 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -19449,19 +20078,20 @@ pub mod question { members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; pub static NONUNION_MEMBERS : &[u16] = &[]; - pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[0,1,2,3,4,5]; + pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[0,1,2,3,4,5,6]; pub const TYPE_ID: u64 = 0xdf83_6e15_2a91_83b2; } - pub enum Which { + pub enum Which { StatusQ(A0), FindNodeQ(A1), AppCallQ(A2), GetValueQ(A3), SetValueQ(A4), WatchValueQ(A5), + InspectValueQ(A6), } - pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; - pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; + pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; + pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; } } @@ -20317,7 +20947,7 @@ pub mod answer { } pub mod detail { - pub use self::Which::{StatusA,FindNodeA,AppCallA,GetValueA,SetValueA,WatchValueA}; + pub use self::Which::{StatusA,FindNodeA,AppCallA,GetValueA,SetValueA,WatchValueA,InspectValueA}; #[derive(Copy, Clone)] pub struct Owned(()); @@ -20410,6 +21040,11 @@ pub mod answer { !self.reader.get_pointer_field(0).is_null() } #[inline] + pub fn has_inspect_value_a(&self) -> bool { + if self.reader.get_data_field::(0) != 6 { return false; } + !self.reader.get_pointer_field(0).is_null() + } + #[inline] pub fn which(self) -> ::core::result::Result, ::capnp::NotInSchema> { match self.reader.get_data_field::(0) { 0 => { @@ -20442,6 +21077,11 @@ pub mod answer { ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) )) } + 6 => { + ::core::result::Result::Ok(InspectValueA( + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) + )) + } x => ::core::result::Result::Err(::capnp::NotInSchema(x)) } } @@ -20590,6 +21230,21 @@ pub mod answer { !self.builder.is_pointer_field_null(0) } #[inline] + pub fn set_inspect_value_a(&mut self, value: crate::veilid_capnp::operation_inspect_value_a::Reader<'_>) -> ::capnp::Result<()> { + self.builder.set_data_field::(0, 6); + ::capnp::traits::SetPointerBuilder::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) + } + #[inline] + pub fn init_inspect_value_a(self, ) -> crate::veilid_capnp::operation_inspect_value_a::Builder<'a> { + self.builder.set_data_field::(0, 6); + ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), 0) + } + #[inline] + pub fn has_inspect_value_a(&self) -> bool { + if self.builder.get_data_field::(0) != 6 { return false; } + !self.builder.is_pointer_field_null(0) + } + #[inline] pub fn which(self) -> ::core::result::Result, ::capnp::NotInSchema> { match self.builder.get_data_field::(0) { 0 => { @@ -20622,6 +21277,11 @@ pub mod answer { ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) )) } + 6 => { + ::core::result::Result::Ok(InspectValueA( + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) + )) + } x => ::core::result::Result::Err(::capnp::NotInSchema(x)) } } @@ -20636,17 +21296,17 @@ pub mod answer { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 113] = [ + pub static ENCODED_NODE: [::capnp::Word; 129] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(181, 242, 159, 40, 61, 141, 102, 244), ::capnp::word(26, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(88, 16, 140, 152, 182, 184, 172, 172), - ::capnp::word(1, 0, 7, 0, 1, 0, 6, 0), + ::capnp::word(1, 0, 7, 0, 1, 0, 7, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(21, 0, 0, 0, 10, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 87, 1, 0, 0), + ::capnp::word(29, 0, 0, 0, 143, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(112, 114, 111, 116, 111, 47, 118, 101), @@ -20654,49 +21314,56 @@ pub mod answer { ::capnp::word(110, 112, 58, 65, 110, 115, 119, 101), ::capnp::word(114, 46, 100, 101, 116, 97, 105, 108), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(24, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(28, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 255, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(153, 0, 0, 0, 66, 0, 0, 0), + ::capnp::word(181, 0, 0, 0, 66, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(148, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(160, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(176, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(188, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(1, 0, 254, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 1, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(157, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(185, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(156, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(168, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(184, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(196, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(2, 0, 253, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 2, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(165, 0, 0, 0, 74, 0, 0, 0), + ::capnp::word(193, 0, 0, 0, 74, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(164, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(176, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(192, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(204, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(3, 0, 252, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 3, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(173, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(201, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(172, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(184, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(200, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(212, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(4, 0, 251, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 4, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(181, 0, 0, 0, 82, 0, 0, 0), + ::capnp::word(209, 0, 0, 0, 82, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(180, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(192, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(208, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(220, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(5, 0, 250, 255, 0, 0, 0, 0), ::capnp::word(0, 0, 1, 0, 5, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(189, 0, 0, 0, 98, 0, 0, 0), + ::capnp::word(217, 0, 0, 0, 98, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(188, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(200, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(216, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(228, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(6, 0, 249, 255, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 6, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(225, 0, 0, 0, 114, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(224, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(236, 0, 0, 0, 2, 0, 1, 0), ::capnp::word(115, 116, 97, 116, 117, 115, 65, 0), ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(85, 42, 129, 250, 7, 244, 6, 179), @@ -20750,6 +21417,15 @@ pub mod answer { ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(105, 110, 115, 112, 101, 99, 116, 86), + ::capnp::word(97, 108, 117, 101, 65, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(2, 225, 14, 150, 175, 127, 181, 184), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(16, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ]; pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { match index { @@ -20759,6 +21435,7 @@ pub mod answer { 3 => ::introspect(), 4 => ::introspect(), 5 => ::introspect(), + 6 => ::introspect(), _ => panic!("invalid field index {}", index), } } @@ -20771,19 +21448,20 @@ pub mod answer { members_by_discriminant: MEMBERS_BY_DISCRIMINANT, }; pub static NONUNION_MEMBERS : &[u16] = &[]; - pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[0,1,2,3,4,5]; + pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[0,1,2,3,4,5,6]; pub const TYPE_ID: u64 = 0xf466_8d3d_289f_f2b5; } - pub enum Which { + pub enum Which { StatusA(A0), FindNodeA(A1), AppCallA(A2), GetValueA(A3), SetValueA(A4), WatchValueA(A5), + InspectValueA(A6), } - pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; - pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; + pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; + pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>>; } } @@ -21406,4 +22084,4 @@ pub mod operation { } } -//BUILDHASH:4ee16918900e322a0daa4bb0a11ca6094989c65936a653b0b78811e027d4d962 +//BUILDHASH:6df0786a4485a9d0c25c177959fe1b2070c2a98884d20267eb70e129be95f2b0 diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 5b9c43b6..404ff922 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -63,6 +63,7 @@ use super::*; pub(in crate::rpc_processor) enum QuestionContext { GetValue(ValidateGetValueContext), SetValue(ValidateSetValueContext), + InspectValue(ValidateInspectValueContext), } #[derive(Clone)] diff --git a/veilid-core/src/rpc_processor/coders/operations/answer.rs b/veilid-core/src/rpc_processor/coders/operations/answer.rs index 9e3bc8da..257efba6 100644 --- a/veilid-core/src/rpc_processor/coders/operations/answer.rs +++ b/veilid-core/src/rpc_processor/coders/operations/answer.rs @@ -38,6 +38,7 @@ pub(in crate::rpc_processor) enum RPCAnswerDetail { GetValueA(Box), SetValueA(Box), WatchValueA(Box), + InspectValueA(Box), #[cfg(feature = "unstable-blockstore")] SupplyBlockA(Box), #[cfg(feature = "unstable-blockstore")] @@ -60,6 +61,7 @@ impl RPCAnswerDetail { RPCAnswerDetail::GetValueA(_) => "GetValueA", RPCAnswerDetail::SetValueA(_) => "SetValueA", RPCAnswerDetail::WatchValueA(_) => "WatchValueA", + RPCAnswerDetail::InspectValueA(_) => "InspectValueA", #[cfg(feature = "unstable-blockstore")] RPCAnswerDetail::SupplyBlockA(_) => "SupplyBlockA", #[cfg(feature = "unstable-blockstore")] @@ -80,6 +82,7 @@ impl RPCAnswerDetail { RPCAnswerDetail::GetValueA(r) => r.validate(validate_context), RPCAnswerDetail::SetValueA(r) => r.validate(validate_context), RPCAnswerDetail::WatchValueA(r) => r.validate(validate_context), + RPCAnswerDetail::InspectValueA(r) => r.validate(validate_context), #[cfg(feature = "unstable-blockstore")] RPCAnswerDetail::SupplyBlockA(r) => r.validate(validate_context), #[cfg(feature = "unstable-blockstore")] @@ -127,6 +130,11 @@ impl RPCAnswerDetail { let out = RPCOperationWatchValueA::decode(&op_reader)?; RPCAnswerDetail::WatchValueA(Box::new(out)) } + veilid_capnp::answer::detail::InspectValueA(r) => { + let op_reader = r.map_err(RPCError::protocol)?; + let out = RPCOperationInspectValueA::decode(&op_reader)?; + RPCAnswerDetail::InspectValueA(Box::new(out)) + } #[cfg(feature = "unstable-blockstore")] veilid_capnp::answer::detail::SupplyBlockA(r) => { let op_reader = r.map_err(RPCError::protocol)?; @@ -173,6 +181,9 @@ impl RPCAnswerDetail { RPCAnswerDetail::WatchValueA(d) => { d.encode(&mut builder.reborrow().init_watch_value_a()) } + RPCAnswerDetail::InspectValueA(d) => { + d.encode(&mut builder.reborrow().init_inspect_value_a()) + } #[cfg(feature = "unstable-blockstore")] RPCAnswerDetail::SupplyBlockA(d) => { d.encode(&mut builder.reborrow().init_supply_block_a()) diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index c73628d1..00d20f48 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -4,6 +4,7 @@ mod operation_app_call; mod operation_app_message; mod operation_find_node; mod operation_get_value; +mod operation_inspect_value; mod operation_return_receipt; mod operation_route; mod operation_set_value; @@ -35,6 +36,7 @@ pub(in crate::rpc_processor) use operation_app_call::*; pub(in crate::rpc_processor) use operation_app_message::*; pub(in crate::rpc_processor) use operation_find_node::*; pub(in crate::rpc_processor) use operation_get_value::*; +pub(in crate::rpc_processor) use operation_inspect_value::*; pub(in crate::rpc_processor) use operation_return_receipt::*; pub(in crate::rpc_processor) use operation_route::*; pub(in crate::rpc_processor) use operation_set_value::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index 0eb39514..b0740651 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -111,29 +111,30 @@ impl RPCOperationGetValueA { panic!("Wrong context type for GetValueA"); }; - if let Some(value) = &self.value { - // Get descriptor to validate with - let descriptor = if let Some(descriptor) = &self.descriptor { - if let Some(last_descriptor) = &get_value_context.last_descriptor { - if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { - return Err(RPCError::protocol( - "getvalue descriptor does not match last descriptor", - )); - } - } - descriptor - } else { - let Some(descriptor) = &get_value_context.last_descriptor else { + // Validate descriptor + if let Some(descriptor) = &self.descriptor { + // Ensure descriptor matches last one + if let Some(last_descriptor) = &get_value_context.last_descriptor { + if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { return Err(RPCError::protocol( - "no last descriptor, requires a descriptor", + "GetValue descriptor does not match last descriptor", )); - }; - descriptor - }; + } + } // Ensure the descriptor itself validates descriptor .validate(get_value_context.vcrypto.clone()) .map_err(RPCError::protocol)?; + } + + // Ensure the value validates + if let Some(value) = &self.value { + // Get descriptor to validate with + let Some(descriptor) = self.descriptor.or(get_value_context.last_descriptor) else { + return Err(RPCError::protocol( + "no last descriptor, requires a descriptor", + )); + }; // And the signed value data value diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs new file mode 100644 index 00000000..34340e65 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs @@ -0,0 +1,291 @@ +use super::*; +use crate::storage_manager::SignedValueDescriptor; + +const MAX_INSPECT_VALUE_Q_SUBKEYS_LEN: usize = 512; +const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = MAX_INSPECT_VALUE_Q_SUBKEYS_LEN; +const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20; + +#[derive(Clone)] +pub(in crate::rpc_processor) struct ValidateInspectValueContext { + pub last_descriptor: Option, + pub subkeys: ValueSubkeyRangeSet, + pub vcrypto: CryptoSystemVersion, +} + +impl fmt::Debug for ValidateInspectValueContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ValidateInspectValueContext") + .field("last_descriptor", &self.last_descriptor) + .field("vcrypto", &self.vcrypto.kind().to_string()) + .finish() + } +} + +#[derive(Debug, Clone)] +pub(in crate::rpc_processor) struct RPCOperationInspectValueQ { + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, +} + +impl RPCOperationInspectValueQ { + pub fn new( + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> Result { + // Needed because RangeSetBlaze uses different types here all the time + #[allow(clippy::unnecessary_cast)] + let subkeys_len = subkeys.len() as usize; + if subkeys_len > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { + return Err(RPCError::protocol("InspectValue subkeys length too long")); + } + Ok(Self { + key, + subkeys, + want_descriptor, + }) + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + + // pub fn key(&self) -> &TypedKey { + // &self.key + // } + // pub fn subkeys(&self) -> &ValueSubkeyRangeSet { + // &self.subkeys + // } + // pub fn want_descriptor(&self) -> bool { + // self.want_descriptor + // } + pub fn destructure(self) -> (TypedKey, ValueSubkeyRangeSet, bool) { + (self.key, self.subkeys, self.want_descriptor) + } + + pub fn decode( + reader: &veilid_capnp::operation_inspect_value_q::Reader, + ) -> Result { + let k_reader = reader.reborrow().get_key().map_err(RPCError::protocol)?; + let key = decode_typed_key(&k_reader)?; + let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?; + // Maximum number of ranges that can hold the maximum number of subkeys is one subkey per range + if sk_reader.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { + return Err(RPCError::protocol("InspectValueQ too many subkey ranges")); + } + let mut subkeys = ValueSubkeyRangeSet::new(); + for skr in sk_reader.iter() { + let vskr = (skr.get_start(), skr.get_end()); + if vskr.0 > vskr.1 { + return Err(RPCError::protocol("invalid subkey range")); + } + if let Some(lvskr) = subkeys.last() { + if lvskr >= vskr.0 { + return Err(RPCError::protocol( + "subkey range out of order or not merged", + )); + } + } + subkeys.ranges_insert(vskr.0..=vskr.1); + } + // Needed because RangeSetBlaze uses different types here all the time + #[allow(clippy::unnecessary_cast)] + if subkeys.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN { + return Err(RPCError::protocol("InspectValueQ too many subkey ranges")); + } + + let want_descriptor = reader.reborrow().get_want_descriptor(); + Ok(Self { + key, + subkeys, + want_descriptor, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_inspect_value_q::Builder, + ) -> Result<(), RPCError> { + let mut k_builder = builder.reborrow().init_key(); + encode_typed_key(&self.key, &mut k_builder); + + let mut sk_builder = builder.reborrow().init_subkeys( + self.subkeys + .ranges_len() + .try_into() + .map_err(RPCError::map_internal("invalid subkey range list length"))?, + ); + for (i, skr) in self.subkeys.ranges().enumerate() { + let mut skr_builder = sk_builder.reborrow().get(i as u32); + skr_builder.set_start(*skr.start()); + skr_builder.set_end(*skr.end()); + } + builder.set_want_descriptor(self.want_descriptor); + Ok(()) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Clone)] +pub(in crate::rpc_processor) struct RPCOperationInspectValueA { + seqs: Vec, + peers: Vec, + descriptor: Option, +} + +impl RPCOperationInspectValueA { + pub fn new( + seqs: Vec, + peers: Vec, + descriptor: Option, + ) -> Result { + if seqs.len() > MAX_INSPECT_VALUE_A_SEQS_LEN { + return Err(RPCError::protocol( + "encoded InspectValueA seqs length too long", + )); + } + if peers.len() > MAX_INSPECT_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol( + "encoded InspectValueA peers length too long", + )); + } + Ok(Self { + seqs, + peers, + descriptor, + }) + } + + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + let question_context = validate_context + .question_context + .as_ref() + .expect("InspectValueA requires question context"); + let QuestionContext::InspectValue(inspect_value_context) = question_context else { + panic!("Wrong context type for InspectValueA"); + }; + + // Ensure seqs returned does not exceeed subkeys requested + #[allow(clippy::unnecessary_cast)] + if self.seqs.len() > inspect_value_context.subkeys.len() as usize { + return Err(RPCError::protocol( + "InspectValue seqs length is greater than subkeys requested", + )); + } + + // Validate descriptor + if let Some(descriptor) = &self.descriptor { + // Ensure descriptor matches last one + if let Some(last_descriptor) = &inspect_value_context.last_descriptor { + if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { + return Err(RPCError::protocol( + "InspectValue descriptor does not match last descriptor", + )); + } + } + // Ensure the descriptor itself validates + descriptor + .validate(inspect_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + } + + PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + Ok(()) + } + + // pub fn seqs(&self) -> &[ValueSeqNum] { + // &self.seqs + // } + // pub fn peers(&self) -> &[PeerInfo] { + // &self.peers + // } + // pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { + // self.descriptor.as_ref() + // } + pub fn destructure( + self, + ) -> ( + Vec, + Vec, + Option, + ) { + (self.seqs, self.peers, self.descriptor) + } + + pub fn decode( + reader: &veilid_capnp::operation_inspect_value_a::Reader, + ) -> Result { + let seqs = if reader.has_seqs() { + let seqs_reader = reader.get_seqs().map_err(RPCError::protocol)?; + let Some(seqs) = seqs_reader.as_slice().map(|s| s.to_vec()) else { + return Err(RPCError::protocol("invalid decoded InspectValueA seqs")); + }; + seqs + } else { + return Err(RPCError::protocol("missing decoded InspectValueA seqs")); + }; + + let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + if peers_reader.len() as usize > MAX_INSPECT_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol( + "decoded InspectValueA peers length too long", + )); + } + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(RPCError::map_internal("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p)?; + peers.push(peer_info); + } + + let descriptor = if reader.has_descriptor() { + let d_reader = reader.get_descriptor().map_err(RPCError::protocol)?; + let descriptor = decode_signed_value_descriptor(&d_reader)?; + Some(descriptor) + } else { + None + }; + + Ok(Self { + seqs, + peers, + descriptor, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_inspect_value_a::Builder, + ) -> Result<(), RPCError> { + let mut seqs_builder = builder.reborrow().init_seqs( + self.seqs + .len() + .try_into() + .map_err(RPCError::map_internal("invalid seqs list length"))?, + ); + for (i, seq) in self.seqs.iter().enumerate() { + seqs_builder.set(i as u32, *seq); + } + + let mut peers_builder = builder.reborrow().init_peers( + self.peers + .len() + .try_into() + .map_err(RPCError::map_internal("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + + if let Some(descriptor) = &self.descriptor { + let mut d_builder = builder.reborrow().init_descriptor(); + encode_signed_value_descriptor(descriptor, &mut d_builder)?; + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 121e27d4..393f29e8 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -1,7 +1,7 @@ use super::*; use crate::storage_manager::SignedValueData; -const MAX_VALUE_CHANGED_SUBKEYS_LEN: usize = 512; +const MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN: usize = 512; #[derive(Debug, Clone)] pub(in crate::rpc_processor) struct RPCOperationValueChanged { @@ -21,12 +21,12 @@ impl RPCOperationValueChanged { watch_id: u64, value: SignedValueData, ) -> Result { - // Needed because RangeSetBlaze uses different types here all the time - #[allow(clippy::unnecessary_cast)] let subkeys_len = subkeys.ranges_len() as usize; - if subkeys_len > MAX_VALUE_CHANGED_SUBKEYS_LEN { - return Err(RPCError::protocol("ValueChanged subkeys length too long")); + if subkeys_len > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { + return Err(RPCError::protocol( + "ValueChanged subkey ranges length too long", + )); } if watch_id == 0 { @@ -93,8 +93,10 @@ impl RPCOperationValueChanged { let key = decode_typed_key(&k_reader)?; let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?; - if sk_reader.len() as usize > MAX_VALUE_CHANGED_SUBKEYS_LEN { - return Err(RPCError::protocol("ValueChanged subkeys length too long")); + if sk_reader.len() as usize > MAX_VALUE_CHANGED_SUBKEY_RANGES_LEN { + return Err(RPCError::protocol( + "ValueChanged subkey ranges length too long", + )); } let mut subkeys = ValueSubkeyRangeSet::new(); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs index 62a45271..8e494ab3 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -1,6 +1,6 @@ use super::*; -const MAX_WATCH_VALUE_Q_SUBKEYS_LEN: usize = 512; +const MAX_WATCH_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512; const MAX_WATCH_VALUE_A_PEERS_LEN: usize = 20; #[derive(Debug, Clone)] @@ -25,11 +25,7 @@ impl RPCOperationWatchValueQ { watcher: KeyPair, vcrypto: CryptoSystemVersion, ) -> Result { - // Needed because RangeSetBlaze uses different types here all the time - #[allow(clippy::unnecessary_cast)] - let subkeys_len = subkeys.ranges_len() as usize; - - if subkeys_len > MAX_WATCH_VALUE_Q_SUBKEYS_LEN { + if subkeys.ranges_len() > MAX_WATCH_VALUE_Q_SUBKEY_RANGES_LEN { return Err(RPCError::protocol("WatchValueQ subkeys length too long")); } @@ -62,9 +58,7 @@ impl RPCOperationWatchValueQ { count: u32, watch_id: Option, ) -> Vec { - // Needed because RangeSetBlaze uses different types here all the time - #[allow(clippy::unnecessary_cast)] - let subkeys_len = subkeys.ranges_len() as usize; + let subkeys_len = subkeys.ranges_len(); let mut sig_data = Vec::with_capacity(PUBLIC_KEY_LENGTH + 4 + (subkeys_len * 8) + 8 + 8); sig_data.extend_from_slice(&key.kind.0); @@ -168,8 +162,8 @@ impl RPCOperationWatchValueQ { let key = decode_typed_key(&k_reader)?; let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?; - if sk_reader.len() as usize > MAX_WATCH_VALUE_Q_SUBKEYS_LEN { - return Err(RPCError::protocol("WatchValueQ subkeys length too long")); + if sk_reader.len() as usize > MAX_WATCH_VALUE_Q_SUBKEY_RANGES_LEN { + return Err(RPCError::protocol("WatchValueQ too many subkey ranges")); } let mut subkeys = ValueSubkeyRangeSet::new(); for skr in sk_reader.iter() { diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index 7ed01907..bf36b35e 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -50,6 +50,7 @@ pub(in crate::rpc_processor) enum RPCQuestionDetail { GetValueQ(Box), SetValueQ(Box), WatchValueQ(Box), + InspectValueQ(Box), #[cfg(feature = "unstable-blockstore")] SupplyBlockQ(Box), #[cfg(feature = "unstable-blockstore")] @@ -72,6 +73,7 @@ impl RPCQuestionDetail { RPCQuestionDetail::GetValueQ(_) => "GetValueQ", RPCQuestionDetail::SetValueQ(_) => "SetValueQ", RPCQuestionDetail::WatchValueQ(_) => "WatchValueQ", + RPCQuestionDetail::InspectValueQ(_) => "InspectValueQ", #[cfg(feature = "unstable-blockstore")] RPCQuestionDetail::SupplyBlockQ(_) => "SupplyBlockQ", #[cfg(feature = "unstable-blockstore")] @@ -92,6 +94,7 @@ impl RPCQuestionDetail { RPCQuestionDetail::GetValueQ(r) => r.validate(validate_context), RPCQuestionDetail::SetValueQ(r) => r.validate(validate_context), RPCQuestionDetail::WatchValueQ(r) => r.validate(validate_context), + RPCQuestionDetail::InspectValueQ(r) => r.validate(validate_context), #[cfg(feature = "unstable-blockstore")] RPCQuestionDetail::SupplyBlockQ(r) => r.validate(validate_context), #[cfg(feature = "unstable-blockstore")] @@ -140,6 +143,11 @@ impl RPCQuestionDetail { let out = RPCOperationWatchValueQ::decode(&op_reader)?; RPCQuestionDetail::WatchValueQ(Box::new(out)) } + veilid_capnp::question::detail::InspectValueQ(r) => { + let op_reader = r.map_err(RPCError::protocol)?; + let out = RPCOperationInspectValueQ::decode(&op_reader)?; + RPCQuestionDetail::InspectValueQ(Box::new(out)) + } #[cfg(feature = "unstable-blockstore")] veilid_capnp::question::detail::SupplyBlockQ(r) => { let op_reader = r.map_err(RPCError::protocol)?; @@ -186,6 +194,9 @@ impl RPCQuestionDetail { RPCQuestionDetail::WatchValueQ(d) => { d.encode(&mut builder.reborrow().init_watch_value_q()) } + RPCQuestionDetail::InspectValueQ(d) => { + d.encode(&mut builder.reborrow().init_inspect_value_q()) + } #[cfg(feature = "unstable-blockstore")] RPCQuestionDetail::SupplyBlockQ(d) => { d.encode(&mut builder.reborrow().init_supply_block_q()) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index e51e431a..1060e28c 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -8,6 +8,7 @@ mod rpc_app_message; mod rpc_error; mod rpc_find_node; mod rpc_get_value; +mod rpc_inspect_value; mod rpc_return_receipt; mod rpc_route; mod rpc_set_value; @@ -1621,6 +1622,7 @@ impl RPCProcessor { RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await, RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await, RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await, + RPCQuestionDetail::InspectValueQ(_) => self.process_inspect_value_q(msg).await, #[cfg(feature = "unstable-blockstore")] RPCQuestionDetail::SupplyBlockQ(_) => self.process_supply_block_q(msg).await, #[cfg(feature = "unstable-blockstore")] diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs new file mode 100644 index 00000000..3cd98f55 --- /dev/null +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -0,0 +1,266 @@ +use super::*; +use crate::storage_manager::SignedValueDescriptor; + +#[derive(Clone, Debug)] +pub struct InspectValueAnswer { + pub seqs: Vec, + pub peers: Vec, + pub descriptor: Option, +} + +impl RPCProcessor { + /// Sends an inspect value request and wait for response + /// Can be sent via all methods including relays + /// Safety routes may be used, but never private routes. + /// Because this leaks information about the identity of the node itself, + /// replying to this request received over a private route will leak + /// the identity of the node and defeat the private route. + + #[cfg_attr( + feature = "verbose-tracing", + instrument(level = "trace", skip(self, last_descriptor), + fields(ret.value.data.len, + ret.seqs, + ret.peers.len, + ret.latency + ),err) + )] + pub async fn rpc_call_inspect_value( + self, + dest: Destination, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + last_descriptor: Option, + ) ->RPCNetworkResult> { + // Ensure destination never has a private route + // and get the target noderef so we can validate the response + let Some(target) = dest.node() else { + return Err(RPCError::internal( + "Never send get value requests over private routes", + )); + }; + + // Get the target node id + let Some(vcrypto) = self.crypto.get(key.kind) else { + return Err(RPCError::internal("unsupported cryptosystem")); + }; + let Some(target_node_id) = target.node_ids().get(key.kind) else { + return Err(RPCError::internal("No node id for crypto kind")); + }; + + let debug_string = format!( + "OUT ==> InspectValueQ({} #{}{}) => {}", + key, + &subkeys, + if last_descriptor.is_some() { + " +lastdesc" + } else { + "" + }, + dest + ); + + // Send the inspectvalue question + let inspect_value_q = RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?; + let question = RPCQuestion::new( + network_result_try!(self.get_destination_respond_to(&dest)?), + RPCQuestionDetail::InspectValueQ(Box::new(inspect_value_q)), + ); + + let question_context = QuestionContext::InspectValue(ValidateInspectValueContext { + last_descriptor, + subkeys, + vcrypto: vcrypto.clone(), + }); + + log_dht!(debug "{}", debug_string); + + let waitable_reply = network_result_try!( + self.question(dest.clone(), question, Some(question_context)) + .await? + ); + + // Keep the reply private route that was used to return with the answer + let reply_private_route = waitable_reply.reply_private_route; + + // Wait for reply + let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { + TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), + TimeoutOr::Value(v) => v, + }; + + // Get the right answer type + let (_, _, _, kind) = msg.operation.destructure(); + let inspect_value_a = match kind { + RPCOperationKind::Answer(a) => match a.destructure() { + RPCAnswerDetail::InspectValueA(a) => a, + _ => return Ok(NetworkResult::invalid_message("not an inspectvalue answer")), + }, + _ => return Ok(NetworkResult::invalid_message("not an answer")), + }; + + let (seqs, peers, descriptor) = inspect_value_a.destructure(); + if debug_target_enabled!("dht") { + let debug_string_answer = format!( + "OUT <== InspectValueA({} {:?}{} peers={}) <= {}", + key, + seqs, + if descriptor.is_some() { + " +desc" + } else { + "" + }, + peers.len(), + dest + ); + + log_dht!(debug "{}", debug_string_answer); + + let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + log_dht!(debug "Peers: {:#?}", peer_ids); + } + + // Validate peers returned are, in fact, closer to the key than the node we sent this to + let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { + Ok(v) => v, + Err(e) => { + return Ok(NetworkResult::invalid_message(format!( + "missing cryptosystem in peers node ids: {}", + e + ))); + } + }; + if !valid { + return Ok(NetworkResult::invalid_message("non-closer peers returned")); + } + + #[cfg(feature = "verbose-tracing")] + tracing::Span::current().record("ret.latency", latency.as_u64()); + #[cfg(feature = "verbose-tracing")] + tracing::Span::current().record("ret.seqs", seqs); + #[cfg(feature = "verbose-tracing")] + tracing::Span::current().record("ret.peers.len", peers.len()); + + Ok(NetworkResult::value(Answer::new( + latency, + reply_private_route, + InspectValueAnswer { + seqs, + peers, + descriptor, + }, + ))) + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] + pub(crate) async fn process_inspect_value_q( + &self, + msg: RPCMessage, + ) -> RPCNetworkResult<()> { + + // Ensure this never came over a private route, safety route is okay though + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} + RPCMessageHeaderDetail::PrivateRouted(_) => { + return Ok(NetworkResult::invalid_message( + "not processing inspect value request over private route", + )) + } + } + // Ignore if disabled + let routing_table = self.routing_table(); + let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); + if !opi + .signed_node_info() + .node_info() + .has_capability(CAP_DHT) + { + return Ok(NetworkResult::service_unavailable( + "dht is not available", + )); + } + + // Get the question + let kind = msg.operation.kind().clone(); + let inspect_value_q = match kind { + RPCOperationKind::Question(q) => match q.destructure() { + (_, RPCQuestionDetail::InspectValueQ(q)) => q, + _ => panic!("not a inspectvalue question"), + }, + _ => panic!("not a question"), + }; + + // Destructure + let (key, subkeys, want_descriptor) = inspect_value_q.destructure(); + + // Get the nodes that we know about that are closer to the the key than our own node + let routing_table = self.routing_table(); + let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH])); + + if debug_target_enabled!("dht") { + let debug_string = format!( + "IN <=== InspectValueQ({} {}{}) <== {}", + key, + subkeys, + if want_descriptor { + " +wantdesc" + } else { + "" + }, + msg.header.direct_sender_node_id() + ); + + log_dht!(debug "{}", debug_string); + } + + // See if we would have accepted this as a set + let set_value_count = { + let c = self.config.get(); + c.network.dht.set_value_count as usize + }; + let (inspect_result_seqs, inspect_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { + // Not close enough + (Vec::new(), None) + } else { + // Close enough, lets get it + + // See if we have this record ourselves + let storage_manager = self.storage_manager(); + let inspect_result = network_result_try!(storage_manager + .inbound_inspect_value(key, subkeys, want_descriptor) + .await + .map_err(RPCError::internal)?); + (inspect_result.seqs, inspect_result.descriptor) + }; + + if debug_target_enabled!("dht") { + let debug_string_answer = format!( + "IN ===> InspectValueA({} {:?}{} peers={}) ==> {}", + key, + inspect_result_seqs, + if inspect_result_descriptor.is_some() { + " +desc" + } else { + "" + }, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_dht!(debug "{}", debug_string_answer); + } + + // Make GetValue answer + let inspect_value_a = RPCOperationInspectValueA::new( + inspect_result_seqs, + closer_to_key_peers, + inspect_result_descriptor.map(|x| (*x).clone()), + )?; + + // Send GetValue answer + self.answer(msg, RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a)))) + .await + } +} diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs new file mode 100644 index 00000000..8ab19361 --- /dev/null +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -0,0 +1,267 @@ +use super::*; + +/// The context of the outbound_get_value operation +struct OutboundGxxx continue here etValueContext { + /// The latest value of the subkey, may be the value passed in + pub value: Option>, + /// The nodes that have returned the value so far (up to the consensus count) + pub value_nodes: Vec, + /// The descriptor if we got a fresh one or empty if no descriptor was needed + pub descriptor: Option>, + /// The parsed schema from the descriptor if we have one + pub schema: Option, +} + +/// The result of the outbound_get_value operation +pub(super) struct OutboundGetValueResult { + /// The subkey that was retrieved + pub subkey_result: SubkeyResult, + /// And where it was retrieved from + pub value_nodes: Vec, +} + +impl StorageManager { + /// Perform a 'inspect value' query on the network + pub(super) async fn outbound_get_value( + &self, + rpc_processor: RPCProcessor, + key: TypedKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + last_subkey_result: SubkeyResult, + ) -> VeilidAPIResult { + let routing_table = rpc_processor.routing_table(); + + // Get the DHT parameters for 'GetValue' + let (key_count, consensus_count, fanout, timeout_us) = { + let c = self.unlocked_inner.config.get(); + ( + c.network.dht.max_find_node_count as usize, + c.network.dht.get_value_count as usize, + c.network.dht.get_value_fanout as usize, + TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), + ) + }; + + // Make do-get-value answer context + let schema = if let Some(d) = &last_subkey_result.descriptor { + Some(d.schema()?) + } else { + None + }; + let context = Arc::new(Mutex::new(OutboundGetValueContext { + value: last_subkey_result.value, + value_nodes: vec![], + descriptor: last_subkey_result.descriptor.clone(), + schema, + })); + + // Routine to call to generate fanout + let call_routine = |next_node: NodeRef| { + let rpc_processor = rpc_processor.clone(); + let context = context.clone(); + let last_descriptor = last_subkey_result.descriptor.clone(); + async move { + let gva = network_result_try!( + rpc_processor + .clone() + .rpc_call_get_value( + Destination::direct(next_node.clone()).with_safety(safety_selection), + key, + subkey, + last_descriptor.map(|x| (*x).clone()), + ) + .await? + ); + + // Keep the descriptor if we got one. If we had a last_descriptor it will + // already be validated by rpc_call_get_value + if let Some(descriptor) = gva.answer.descriptor { + let mut ctx = context.lock(); + if ctx.descriptor.is_none() && ctx.schema.is_none() { + ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); + ctx.descriptor = Some(Arc::new(descriptor)); + } + } + + // Keep the value if we got one and it is newer and it passes schema validation + if let Some(value) = gva.answer.value { + log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); + let mut ctx = context.lock(); + + // Ensure we have a schema and descriptor + let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { + // Got a value but no descriptor for it + // Move to the next node + return Ok(NetworkResult::invalid_message( + "Got value with no descriptor", + )); + }; + + // Validate with schema + if !schema.check_subkey_value_data( + descriptor.owner(), + subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(NetworkResult::invalid_message(format!( + "Schema validation failed on subkey {}", + subkey + ))); + } + + // If we have a prior value, see if this is a newer sequence number + if let Some(prior_value) = &ctx.value { + let prior_seq = prior_value.value_data().seq(); + let new_seq = value.value_data().seq(); + + if new_seq == prior_seq { + // If sequence number is the same, the data should be the same + if prior_value.value_data() != value.value_data() { + // Move to the next node + return Ok(NetworkResult::invalid_message("value data mismatch")); + } + // Increase the consensus count for the existing value + ctx.value_nodes.push(next_node); + } else if new_seq > prior_seq { + // If the sequence number is greater, start over with the new value + ctx.value = Some(Arc::new(value)); + // One node has shown us this value so far + ctx.value_nodes = vec![next_node]; + } else { + // If the sequence number is older, ignore it + } + } else { + // If we have no prior value, keep it + ctx.value = Some(Arc::new(value)); + // One node has shown us this value so far + ctx.value_nodes = vec![next_node]; + } + } + + // Return peers if we have some + log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); + + Ok(NetworkResult::value(gva.answer.peers)) + } + }; + + // Routine to call to check if we're done at each step + let check_done = |_closest_nodes: &[NodeRef]| { + // If we have reached sufficient consensus, return done + let ctx = context.lock(); + if ctx.value.is_some() + && ctx.descriptor.is_some() + && ctx.value_nodes.len() >= consensus_count + { + return Some(()); + } + None + }; + + // Call the fanout + let fanout_call = FanoutCall::new( + routing_table.clone(), + key, + key_count, + fanout, + timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]), + call_routine, + check_done, + ); + + match fanout_call.run(vec![]).await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout => { + // Return the best answer we've got + let ctx = context.lock(); + if ctx.value_nodes.len() >= consensus_count { + log_stor!(debug "GetValue Fanout Timeout Consensus"); + } else { + log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len()); + } + Ok(OutboundGetValueResult { + subkey_result: SubkeyResult { + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }, + value_nodes: ctx.value_nodes.clone(), + }) + } + // If we finished with consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => { + // Return the best answer we've got + let ctx = context.lock(); + if ctx.value_nodes.len() >= consensus_count { + log_stor!(debug "GetValue Fanout Consensus"); + } else { + log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len()); + } + Ok(OutboundGetValueResult { + subkey_result: SubkeyResult { + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }, + value_nodes: ctx.value_nodes.clone(), + }) + } + // If we finished without consensus (ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => { + // Return the best answer we've got + let ctx = context.lock(); + if ctx.value_nodes.len() >= consensus_count { + log_stor!(debug "GetValue Fanout Exhausted Consensus"); + } else { + log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len()); + } + Ok(OutboundGetValueResult { + subkey_result: SubkeyResult { + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }, + value_nodes: ctx.value_nodes.clone(), + }) + } + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + log_stor!(debug "GetValue Fanout Error: {}", e); + Err(e.into()) + } + } + } + + /// Handle a received 'Get Value' query + pub async fn inbound_get_value( + &self, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult> { + let mut inner = self.lock().await?; + + // See if this is a remote or local value + let (_is_local, last_subkey_result) = { + // See if the subkey we are getting has a last known local value + let mut last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + // If this is local, it must have a descriptor already + if last_subkey_result.descriptor.is_some() { + if !want_descriptor { + last_subkey_result.descriptor = None; + } + (true, last_subkey_result) + } else { + // See if the subkey we are getting has a last known remote value + let last_subkey_result = inner + .handle_get_remote_value(key, subkey, want_descriptor) + .await?; + (false, last_subkey_result) + } + }; + + Ok(NetworkResult::value(last_subkey_result)) + } +}