mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-02-17 13:02:50 -05:00
refactor checkpoint
This commit is contained in:
parent
7f909a06b9
commit
74168e96be
1704
Cargo.lock
generated
1704
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -65,9 +65,7 @@ rtnetlink = { version = "^0", default-features = false, optional = true }
|
|||||||
async-std-resolver = { version = "^0", optional = true }
|
async-std-resolver = { version = "^0", optional = true }
|
||||||
trust-dns-resolver = { version = "^0", optional = true }
|
trust-dns-resolver = { version = "^0", optional = true }
|
||||||
keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" }
|
keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" }
|
||||||
#rkyv = { version = "^0", default_features = false, features = ["std", "alloc", "strict", "size_32", "validation"] }
|
rkyv = { version = "^0", default_features = false, features = ["std", "alloc", "strict", "size_32", "validation"] }
|
||||||
rkyv = { git = "https://github.com/rkyv/rkyv.git", rev = "57e2a8d", default_features = false, features = ["std", "alloc", "strict", "size_32", "validation"] }
|
|
||||||
bytecheck = "^0"
|
|
||||||
data-encoding = { version = "^2" }
|
data-encoding = { version = "^2" }
|
||||||
weak-table = "0.3.2"
|
weak-table = "0.3.2"
|
||||||
|
|
||||||
@ -91,7 +89,7 @@ rustls = "^0.19"
|
|||||||
rustls-pemfile = "^0.2"
|
rustls-pemfile = "^0.2"
|
||||||
futures-util = { version = "^0", default-features = false, features = ["async-await", "sink", "std", "io"] }
|
futures-util = { version = "^0", default-features = false, features = ["async-await", "sink", "std", "io"] }
|
||||||
keyvaluedb-sqlite = { path = "../external/keyvaluedb/keyvaluedb-sqlite" }
|
keyvaluedb-sqlite = { path = "../external/keyvaluedb/keyvaluedb-sqlite" }
|
||||||
socket2 = "^0"
|
socket2 = { version = "^0", features = ["all"] }
|
||||||
bugsalot = "^0"
|
bugsalot = "^0"
|
||||||
chrono = "^0"
|
chrono = "^0"
|
||||||
libc = "^0"
|
libc = "^0"
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use crate::*;
|
use crate::*;
|
||||||
use data_encoding::BASE64URL_NOPAD;
|
use data_encoding::BASE64URL_NOPAD;
|
||||||
use keyring_manager::*;
|
use keyring_manager::*;
|
||||||
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
use rkyv::{bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
pub struct ProtectedStoreInner {
|
pub struct ProtectedStoreInner {
|
||||||
@ -175,7 +175,7 @@ impl ProtectedStore {
|
|||||||
K: AsRef<str> + fmt::Debug,
|
K: AsRef<str> + fmt::Debug,
|
||||||
T: RkyvArchive,
|
T: RkyvArchive,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
for<'t> bytecheck::CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
||||||
{
|
{
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
use crate::*;
|
use crate::*;
|
||||||
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
use rkyv::{
|
||||||
|
bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize,
|
||||||
|
Serialize as RkyvSerialize,
|
||||||
|
};
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(target_arch = "wasm32")] {
|
if #[cfg(target_arch = "wasm32")] {
|
||||||
@ -127,7 +130,7 @@ impl TableDB {
|
|||||||
where
|
where
|
||||||
T: RkyvArchive,
|
T: RkyvArchive,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
for<'t> bytecheck::CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
||||||
{
|
{
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
use crate::*;
|
use crate::*;
|
||||||
use data_encoding::BASE64URL_NOPAD;
|
use data_encoding::BASE64URL_NOPAD;
|
||||||
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
use rkyv::{
|
||||||
|
bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize,
|
||||||
|
Serialize as RkyvSerialize,
|
||||||
|
};
|
||||||
|
|
||||||
use web_sys::*;
|
use web_sys::*;
|
||||||
|
|
||||||
@ -155,7 +158,7 @@ impl ProtectedStore {
|
|||||||
K: AsRef<str> + fmt::Debug,
|
K: AsRef<str> + fmt::Debug,
|
||||||
T: RkyvArchive,
|
T: RkyvArchive,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
for<'t> bytecheck::CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
||||||
{
|
{
|
||||||
|
@ -30,7 +30,6 @@ mod routing_table;
|
|||||||
mod rpc_processor;
|
mod rpc_processor;
|
||||||
mod storage_manager;
|
mod storage_manager;
|
||||||
mod veilid_api;
|
mod veilid_api;
|
||||||
#[macro_use]
|
|
||||||
mod veilid_config;
|
mod veilid_config;
|
||||||
mod veilid_layer_filter;
|
mod veilid_layer_filter;
|
||||||
|
|
||||||
@ -41,6 +40,13 @@ pub use self::veilid_config::*;
|
|||||||
pub use self::veilid_layer_filter::*;
|
pub use self::veilid_layer_filter::*;
|
||||||
pub use veilid_tools as tools;
|
pub use veilid_tools as tools;
|
||||||
|
|
||||||
|
use enumset::*;
|
||||||
|
use rkyv::{
|
||||||
|
bytecheck, bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize,
|
||||||
|
Serialize as RkyvSerialize,
|
||||||
|
};
|
||||||
|
use serde::*;
|
||||||
|
|
||||||
pub mod veilid_capnp {
|
pub mod veilid_capnp {
|
||||||
include!(concat!(env!("OUT_DIR"), "/proto/veilid_capnp.rs"));
|
include!(concat!(env!("OUT_DIR"), "/proto/veilid_capnp.rs"));
|
||||||
}
|
}
|
||||||
|
@ -20,3 +20,32 @@ pub enum SignalInfo {
|
|||||||
},
|
},
|
||||||
// XXX: WebRTC
|
// XXX: WebRTC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SignalInfo {
|
||||||
|
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
||||||
|
match self {
|
||||||
|
SignalInfo::HolePunch { receipt, peer_info } => {
|
||||||
|
if receipt.len() < MIN_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol("SignalInfo HolePunch receipt too short"));
|
||||||
|
}
|
||||||
|
if receipt.len() > MAX_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol("SignalInfo HolePunch receipt too long"));
|
||||||
|
}
|
||||||
|
peer_info.validate(crypto).map_err(RPCError::protocol)
|
||||||
|
}
|
||||||
|
SignalInfo::ReverseConnect { receipt, peer_info } => {
|
||||||
|
if receipt.len() < MIN_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol(
|
||||||
|
"SignalInfo ReverseConnect receipt too short",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if receipt.len() > MAX_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol(
|
||||||
|
"SignalInfo ReverseConnect receipt too long",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
peer_info.validate(crypto).map_err(RPCError::protocol)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -12,7 +12,7 @@ mod stats_accounting;
|
|||||||
mod tasks;
|
mod tasks;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use crate::*;
|
use super::*;
|
||||||
|
|
||||||
use crate::crypto::*;
|
use crate::crypto::*;
|
||||||
use crate::network_manager::*;
|
use crate::network_manager::*;
|
||||||
|
@ -25,13 +25,7 @@ impl RouteNode {
|
|||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), VeilidAPIError> {
|
pub fn validate(&self, crypto: Crypto) -> Result<(), VeilidAPIError> {
|
||||||
match self {
|
match self {
|
||||||
RouteNode::NodeId(_) => Ok(()),
|
RouteNode::NodeId(_) => Ok(()),
|
||||||
RouteNode::PeerInfo(pi) => {
|
RouteNode::PeerInfo(pi) => pi.validate(crypto),
|
||||||
let validated_node_ids = pi.validate(crypto)?;
|
|
||||||
if validated_node_ids.is_empty() {
|
|
||||||
apibail_generic!("no validated node ids for route node");
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ impl RoutingDomainDetailCommon {
|
|||||||
.and_then(|rn| {
|
.and_then(|rn| {
|
||||||
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
|
let opt_relay_pi = rn.locked(rti).make_peer_info(self.routing_domain);
|
||||||
if let Some(relay_pi) = opt_relay_pi {
|
if let Some(relay_pi) = opt_relay_pi {
|
||||||
let (relay_ids, relay_sni) = relay_pi.into_fields();
|
let (relay_ids, relay_sni) = relay_pi.destructure();
|
||||||
match relay_sni {
|
match relay_sni {
|
||||||
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
|
SignedNodeInfo::Direct(d) => Some((relay_ids, d)),
|
||||||
SignedNodeInfo::Relayed(_) => {
|
SignedNodeInfo::Relayed(_) => {
|
||||||
|
@ -835,8 +835,9 @@ impl RoutingTableInner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.create_node_ref(outer_self, peer_info.node_ids(), |_rti, e| {
|
let (node_ids, signed_node_info) = peer_info.destructure();
|
||||||
e.update_signed_node_info(routing_domain, peer_info.into_signed_node_info());
|
self.create_node_ref(outer_self, &node_ids, |_rti, e| {
|
||||||
|
e.update_signed_node_info(routing_domain, signed_node_info);
|
||||||
})
|
})
|
||||||
.map(|mut nr| {
|
.map(|mut nr| {
|
||||||
nr.set_filter(Some(
|
nr.set_filter(Some(
|
||||||
|
@ -16,8 +16,13 @@ impl PeerInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<TypedKeySet, VeilidAPIError> {
|
pub fn validate(&self, crypto: Crypto) -> Result<(), VeilidAPIError> {
|
||||||
self.signed_node_info.validate(&self.node_ids, crypto)
|
let validated_node_ids = self.signed_node_info.validate(&self.node_ids, crypto)?;
|
||||||
|
if validated_node_ids.is_empty() {
|
||||||
|
// Shouldn't get here because signed node info validation also checks this
|
||||||
|
apibail_generic!("no validated node ids");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn node_ids(&self) -> &TypedKeySet {
|
pub fn node_ids(&self) -> &TypedKeySet {
|
||||||
@ -26,10 +31,19 @@ impl PeerInfo {
|
|||||||
pub fn signed_node_info(&self) -> &SignedNodeInfo {
|
pub fn signed_node_info(&self) -> &SignedNodeInfo {
|
||||||
&self.signed_node_info
|
&self.signed_node_info
|
||||||
}
|
}
|
||||||
pub fn into_signed_node_info(self) -> SignedNodeInfo {
|
pub fn destructure(self) -> (TypedKeySet, SignedNodeInfo) {
|
||||||
self.signed_node_info
|
|
||||||
}
|
|
||||||
pub fn into_fields(self) -> (TypedKeySet, SignedNodeInfo) {
|
|
||||||
(self.node_ids, self.signed_node_info)
|
(self.node_ids, self.signed_node_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn validate_vec(peer_info_vec: &mut Vec<PeerInfo>, crypto: Crypto) {
|
||||||
|
let mut n = 0usize;
|
||||||
|
while n < peer_info_vec.len() {
|
||||||
|
let pi = peer_info_vec.get(n).unwrap();
|
||||||
|
if pi.validate(crypto.clone()).is_err() {
|
||||||
|
peer_info_vec.remove(n);
|
||||||
|
} else {
|
||||||
|
n += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,3 +57,19 @@ pub use typed_signature::*;
|
|||||||
pub use value_detail::*;
|
pub use value_detail::*;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DecodeContext {
|
||||||
|
config: VeilidConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum QuestionContext {
|
||||||
|
GetValue(ValidateGetValueContext),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct RPCValidateContext {
|
||||||
|
crypto: Crypto,
|
||||||
|
question_context: Option<QuestionContext>,
|
||||||
|
}
|
||||||
|
@ -9,15 +9,15 @@ impl RPCAnswer {
|
|||||||
pub fn new(detail: RPCAnswerDetail) -> Self {
|
pub fn new(detail: RPCAnswerDetail) -> Self {
|
||||||
Self { detail }
|
Self { detail }
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
self.detail.validate(crypto)
|
self.detail.validate(validate_context)
|
||||||
}
|
|
||||||
pub fn into_detail(self) -> RPCAnswerDetail {
|
|
||||||
self.detail
|
|
||||||
}
|
}
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
|
pub fn destructure(self) -> RPCAnswerDetail {
|
||||||
|
self.detail
|
||||||
|
}
|
||||||
pub fn decode(reader: &veilid_capnp::answer::Reader) -> Result<RPCAnswer, RPCError> {
|
pub fn decode(reader: &veilid_capnp::answer::Reader) -> Result<RPCAnswer, RPCError> {
|
||||||
let d_reader = reader.get_detail();
|
let d_reader = reader.get_detail();
|
||||||
let detail = RPCAnswerDetail::decode(&d_reader)?;
|
let detail = RPCAnswerDetail::decode(&d_reader)?;
|
||||||
@ -60,19 +60,19 @@ impl RPCAnswerDetail {
|
|||||||
RPCAnswerDetail::CancelTunnelA(_) => "CancelTunnelA",
|
RPCAnswerDetail::CancelTunnelA(_) => "CancelTunnelA",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RPCAnswerDetail::StatusA(r) => r.validate(crypto),
|
RPCAnswerDetail::StatusA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::FindNodeA(r) => r.validate(crypto),
|
RPCAnswerDetail::FindNodeA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::AppCallA(r) => r.validate(crypto),
|
RPCAnswerDetail::AppCallA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::GetValueA(r) => r.validate(crypto),
|
RPCAnswerDetail::GetValueA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::SetValueA(r) => r.validate(crypto),
|
RPCAnswerDetail::SetValueA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::WatchValueA(r) => r.validate(crypto),
|
RPCAnswerDetail::WatchValueA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::SupplyBlockA(r) => r.validate(crypto),
|
RPCAnswerDetail::SupplyBlockA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::FindBlockA(r) => r.validate(crypto),
|
RPCAnswerDetail::FindBlockA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::StartTunnelA(r) => r.validate(crypto),
|
RPCAnswerDetail::StartTunnelA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::CompleteTunnelA(r) => r.validate(crypto),
|
RPCAnswerDetail::CompleteTunnelA(r) => r.validate(validate_context),
|
||||||
RPCAnswerDetail::CancelTunnelA(r) => r.validate(crypto),
|
RPCAnswerDetail::CancelTunnelA(r) => r.validate(validate_context),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
|
@ -16,11 +16,11 @@ impl RPCOperationKind {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RPCOperationKind::Question(r) => r.validate(crypto),
|
RPCOperationKind::Question(r) => r.validate(validate_context),
|
||||||
RPCOperationKind::Statement(r) => r.validate(crypto),
|
RPCOperationKind::Statement(r) => r.validate(validate_context),
|
||||||
RPCOperationKind::Answer(r) => r.validate(crypto),
|
RPCOperationKind::Answer(r) => r.validate(validate_context),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,13 +98,15 @@ impl RPCOperation {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
// Validate sender peer info
|
// Validate sender peer info
|
||||||
if let Some(sender_peer_info) = &self.opt_sender_peer_info {
|
if let Some(sender_peer_info) = &self.opt_sender_peer_info {
|
||||||
sender_peer_info.validate(crypto.clone())?;
|
sender_peer_info
|
||||||
|
.validate(validate_context.crypto.clone())
|
||||||
|
.map_err(RPCError::protocol)?;
|
||||||
}
|
}
|
||||||
// Validate operation kind
|
// Validate operation kind
|
||||||
self.kind.validate(crypto)
|
self.kind.validate(validate_context)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn op_id(&self) -> OperationId {
|
pub fn op_id(&self) -> OperationId {
|
||||||
@ -122,11 +124,19 @@ impl RPCOperation {
|
|||||||
&self.kind
|
&self.kind
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_kind(self) -> RPCOperationKind {
|
pub fn destructure(self) -> (OperationId, Option<PeerInfo>, Timestamp, RPCOperationKind) {
|
||||||
self.kind
|
(
|
||||||
|
self.op_id,
|
||||||
|
self.opt_sender_peer_info,
|
||||||
|
self.target_node_info_ts,
|
||||||
|
self.kind,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode(operation_reader: &veilid_capnp::operation::Reader) -> Result<Self, RPCError> {
|
pub fn decode(
|
||||||
|
context: &DecodeContext,
|
||||||
|
operation_reader: &veilid_capnp::operation::Reader,
|
||||||
|
) -> Result<Self, RPCError> {
|
||||||
let op_id = OperationId::new(operation_reader.get_op_id());
|
let op_id = OperationId::new(operation_reader.get_op_id());
|
||||||
|
|
||||||
let sender_peer_info = if operation_reader.has_sender_peer_info() {
|
let sender_peer_info = if operation_reader.has_sender_peer_info() {
|
||||||
|
@ -1,20 +1,30 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
const MAX_APP_CALL_Q_MESSAGE_LEN: usize = 32768;
|
||||||
|
const MAX_APP_CALL_A_MESSAGE_LEN: usize = 32768;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationAppCallQ {
|
pub struct RPCOperationAppCallQ {
|
||||||
pub message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationAppCallQ {
|
impl RPCOperationAppCallQ {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn new(message: &[u8]) -> Result<Self, RPCError> {
|
||||||
xxx length should be checked in decode verify this
|
if message.len() > MAX_APP_CALL_Q_MESSAGE_LEN {
|
||||||
|
return Err(RPCError::protocol("AppCallQ message too long to set"));
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
message: message.to_vec(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_app_call_q::Reader,
|
reader: &veilid_capnp::operation_app_call_q::Reader,
|
||||||
) -> Result<RPCOperationAppCallQ, RPCError> {
|
) -> Result<RPCOperationAppCallQ, RPCError> {
|
||||||
let message = reader.get_message().map_err(RPCError::protocol)?.to_vec();
|
let mr = reader.get_message().map_err(RPCError::protocol)?;
|
||||||
Ok(RPCOperationAppCallQ { message })
|
RPCOperationAppCallQ::new(mr)
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -23,19 +33,39 @@ impl RPCOperationAppCallQ {
|
|||||||
builder.set_message(&self.message);
|
builder.set_message(&self.message);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn message(&self) -> &[u8] {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> Vec<u8> {
|
||||||
|
self.message
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationAppCallA {
|
pub struct RPCOperationAppCallA {
|
||||||
pub message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationAppCallA {
|
impl RPCOperationAppCallA {
|
||||||
|
pub fn new(message: &[u8]) -> Result<Self, RPCError> {
|
||||||
|
if message.len() > MAX_APP_CALL_A_MESSAGE_LEN {
|
||||||
|
return Err(RPCError::protocol("AppCallA message too long to set"));
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
message: message.to_vec(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_app_call_a::Reader,
|
reader: &veilid_capnp::operation_app_call_a::Reader,
|
||||||
) -> Result<RPCOperationAppCallA, RPCError> {
|
) -> Result<RPCOperationAppCallA, RPCError> {
|
||||||
let message = reader.get_message().map_err(RPCError::protocol)?.to_vec();
|
let mr = reader.get_message().map_err(RPCError::protocol)?;
|
||||||
Ok(RPCOperationAppCallA { message })
|
RPCOperationAppCallA::new(mr)
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -44,4 +74,12 @@ impl RPCOperationAppCallA {
|
|||||||
builder.set_message(&self.message);
|
builder.set_message(&self.message);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn message(&self) -> &[u8] {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> Vec<u8> {
|
||||||
|
self.message
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,30 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
const MAX_APP_MESSAGE_MESSAGE_LEN: usize = 32768;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationAppMessage {
|
pub struct RPCOperationAppMessage {
|
||||||
pub message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationAppMessage {
|
impl RPCOperationAppMessage {
|
||||||
|
pub fn new(message: &[u8]) -> Result<Self, RPCError> {
|
||||||
|
if message.len() > MAX_APP_MESSAGE_MESSAGE_LEN {
|
||||||
|
return Err(RPCError::protocol("AppMessage message too long to set"));
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
message: message.to_vec(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_app_message::Reader,
|
reader: &veilid_capnp::operation_app_message::Reader,
|
||||||
) -> Result<RPCOperationAppMessage, RPCError> {
|
) -> Result<RPCOperationAppMessage, RPCError> {
|
||||||
let message = reader.get_message().map_err(RPCError::protocol)?.to_vec();
|
let mr = reader.get_message().map_err(RPCError::protocol)?;
|
||||||
Ok(RPCOperationAppMessage { message })
|
RPCOperationAppMessage::new(mr)
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -19,4 +33,12 @@ impl RPCOperationAppMessage {
|
|||||||
builder.set_message(&self.message);
|
builder.set_message(&self.message);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn message(&self) -> &[u8] {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> Vec<u8> {
|
||||||
|
self.message
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,16 @@ use super::*;
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationCancelTunnelQ {
|
pub struct RPCOperationCancelTunnelQ {
|
||||||
pub id: TunnelId,
|
id: TunnelId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationCancelTunnelQ {
|
impl RPCOperationCancelTunnelQ {
|
||||||
|
pub fn new(id: TunnelId) -> Self {
|
||||||
|
Self { id }
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_cancel_tunnel_q::Reader,
|
reader: &veilid_capnp::operation_cancel_tunnel_q::Reader,
|
||||||
) -> Result<RPCOperationCancelTunnelQ, RPCError> {
|
) -> Result<RPCOperationCancelTunnelQ, RPCError> {
|
||||||
@ -21,6 +27,13 @@ impl RPCOperationCancelTunnelQ {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub fn id(&self) -> TunnelId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> TunnelId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -30,6 +43,15 @@ pub enum RPCOperationCancelTunnelA {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationCancelTunnelA {
|
impl RPCOperationCancelTunnelA {
|
||||||
|
pub fn new_tunnel(id: TunnelId) -> Self {
|
||||||
|
Self::Tunnel(id)
|
||||||
|
}
|
||||||
|
pub fn new_error(error: TunnelError) -> Self {
|
||||||
|
Self::Error(error)
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_cancel_tunnel_a::Reader,
|
reader: &veilid_capnp::operation_cancel_tunnel_a::Reader,
|
||||||
) -> Result<RPCOperationCancelTunnelA, RPCError> {
|
) -> Result<RPCOperationCancelTunnelA, RPCError> {
|
||||||
|
@ -2,13 +2,24 @@ use super::*;
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationCompleteTunnelQ {
|
pub struct RPCOperationCompleteTunnelQ {
|
||||||
pub id: TunnelId,
|
id: TunnelId,
|
||||||
pub local_mode: TunnelMode,
|
local_mode: TunnelMode,
|
||||||
pub depth: u8,
|
depth: u8,
|
||||||
pub endpoint: TunnelEndpoint,
|
endpoint: TunnelEndpoint,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationCompleteTunnelQ {
|
impl RPCOperationCompleteTunnelQ {
|
||||||
|
pub fn new(id: TunnelId, local_mode: TunnelMode, depth: u8, endpoint: TunnelEndpoint) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
local_mode,
|
||||||
|
depth,
|
||||||
|
endpoint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_complete_tunnel_q::Reader,
|
reader: &veilid_capnp::operation_complete_tunnel_q::Reader,
|
||||||
) -> Result<RPCOperationCompleteTunnelQ, RPCError> {
|
) -> Result<RPCOperationCompleteTunnelQ, RPCError> {
|
||||||
@ -43,6 +54,23 @@ impl RPCOperationCompleteTunnelQ {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub fn id(&self) -> TunnelId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn local_mode(&self) -> TunnelMode {
|
||||||
|
self.local_mode
|
||||||
|
}
|
||||||
|
pub fn depth(&self) -> u8 {
|
||||||
|
self.depth
|
||||||
|
}
|
||||||
|
pub fn endpoint(&self) -> &TunnelEndpoint {
|
||||||
|
&self.endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> (TunnelId, TunnelMode, u8, TunnelEndpoint) {
|
||||||
|
(self.id, self.local_mode, self.depth, self.endpoint)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -52,6 +80,16 @@ pub enum RPCOperationCompleteTunnelA {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationCompleteTunnelA {
|
impl RPCOperationCompleteTunnelA {
|
||||||
|
pub fn new_tunnel(tunnel: FullTunnel) -> Self {
|
||||||
|
Self::Tunnel(tunnel)
|
||||||
|
}
|
||||||
|
pub fn new_error(error: TunnelError) -> Self {
|
||||||
|
Self::Error(error)
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_complete_tunnel_a::Reader,
|
reader: &veilid_capnp::operation_complete_tunnel_a::Reader,
|
||||||
) -> Result<RPCOperationCompleteTunnelA, RPCError> {
|
) -> Result<RPCOperationCompleteTunnelA, RPCError> {
|
||||||
|
@ -1,18 +1,28 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
const MAX_FIND_BLOCK_A_DATA_LEN: usize = 32768;
|
||||||
|
const MAX_FIND_BLOCK_A_SUPPLIERS_LEN: usize = 10;
|
||||||
|
const MAX_FIND_BLOCK_A_PEERS_LEN: usize = 10;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationFindBlockQ {
|
pub struct RPCOperationFindBlockQ {
|
||||||
pub block_id: TypedKey,
|
block_id: TypedKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationFindBlockQ {
|
impl RPCOperationFindBlockQ {
|
||||||
|
pub fn new(block_id: TypedKey) -> Self {
|
||||||
|
Self { block_id }
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_find_block_q::Reader,
|
reader: &veilid_capnp::operation_find_block_q::Reader,
|
||||||
) -> Result<RPCOperationFindBlockQ, RPCError> {
|
) -> Result<RPCOperationFindBlockQ, RPCError> {
|
||||||
let bi_reader = reader.get_block_id().map_err(RPCError::protocol)?;
|
let bi_reader = reader.get_block_id().map_err(RPCError::protocol)?;
|
||||||
let block_id = decode_typed_key(&bi_reader)?;
|
let block_id = decode_typed_key(&bi_reader)?;
|
||||||
|
|
||||||
Ok(RPCOperationFindBlockQ { block_id })
|
Ok(RPCOperationFindBlockQ::new(block_id))
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -23,22 +33,68 @@ impl RPCOperationFindBlockQ {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn block_id(&self) -> TypedKey {
|
||||||
|
self.block_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> TypedKey {
|
||||||
|
self.block_id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationFindBlockA {
|
pub struct RPCOperationFindBlockA {
|
||||||
pub data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
pub suppliers: Vec<PeerInfo>,
|
suppliers: Vec<PeerInfo>,
|
||||||
pub peers: Vec<PeerInfo>,
|
peers: Vec<PeerInfo>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationFindBlockA {
|
impl RPCOperationFindBlockA {
|
||||||
|
pub fn new(
|
||||||
|
data: &[u8],
|
||||||
|
suppliers: Vec<PeerInfo>,
|
||||||
|
peers: Vec<PeerInfo>,
|
||||||
|
) -> Result<Self, RPCError> {
|
||||||
|
if data.len() > MAX_FIND_BLOCK_A_DATA_LEN {
|
||||||
|
return Err(RPCError::protocol("find block data length too long"));
|
||||||
|
}
|
||||||
|
if suppliers.len() > MAX_FIND_BLOCK_A_SUPPLIERS_LEN {
|
||||||
|
return Err(RPCError::protocol("find block suppliers length too long"));
|
||||||
|
}
|
||||||
|
if peers.len() > MAX_FIND_BLOCK_A_PEERS_LEN {
|
||||||
|
return Err(RPCError::protocol("find block peers length too long"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
data: data.to_vec(),
|
||||||
|
suppliers,
|
||||||
|
peers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
PeerInfo::validate_vec(&mut self.suppliers, validate_context.crypto.clone());
|
||||||
|
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_find_block_a::Reader,
|
reader: &veilid_capnp::operation_find_block_a::Reader,
|
||||||
) -> Result<RPCOperationFindBlockA, RPCError> {
|
) -> Result<RPCOperationFindBlockA, RPCError> {
|
||||||
let data = reader.get_data().map_err(RPCError::protocol)?.to_vec();
|
let data = reader.get_data().map_err(RPCError::protocol)?;
|
||||||
|
|
||||||
let suppliers_reader = reader.get_suppliers().map_err(RPCError::protocol)?;
|
let suppliers_reader = reader.get_suppliers().map_err(RPCError::protocol)?;
|
||||||
|
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
|
||||||
|
|
||||||
|
if data.len() > MAX_FIND_BLOCK_A_DATA_LEN {
|
||||||
|
return Err(RPCError::protocol("find block data length too long"));
|
||||||
|
}
|
||||||
|
if suppliers_reader.len() as usize > MAX_FIND_BLOCK_A_SUPPLIERS_LEN {
|
||||||
|
return Err(RPCError::protocol("find block suppliers length too long"));
|
||||||
|
}
|
||||||
|
if peers_reader.len() as usize > MAX_FIND_BLOCK_A_PEERS_LEN {
|
||||||
|
return Err(RPCError::protocol("find block peers length too long"));
|
||||||
|
}
|
||||||
|
|
||||||
let mut suppliers = Vec::<PeerInfo>::with_capacity(
|
let mut suppliers = Vec::<PeerInfo>::with_capacity(
|
||||||
suppliers_reader
|
suppliers_reader
|
||||||
.len()
|
.len()
|
||||||
@ -50,7 +106,6 @@ impl RPCOperationFindBlockA {
|
|||||||
suppliers.push(peer_info);
|
suppliers.push(peer_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
|
|
||||||
let mut peers = Vec::<PeerInfo>::with_capacity(
|
let mut peers = Vec::<PeerInfo>::with_capacity(
|
||||||
peers_reader
|
peers_reader
|
||||||
.len()
|
.len()
|
||||||
@ -62,11 +117,7 @@ impl RPCOperationFindBlockA {
|
|||||||
peers.push(peer_info);
|
peers.push(peer_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(RPCOperationFindBlockA {
|
RPCOperationFindBlockA::new(data, suppliers, peers)
|
||||||
data,
|
|
||||||
suppliers,
|
|
||||||
peers,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
@ -99,4 +150,18 @@ impl RPCOperationFindBlockA {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn data(&self) -> &[u8] {
|
||||||
|
&self.data
|
||||||
|
}
|
||||||
|
pub fn suppliers(&self) -> &[PeerInfo] {
|
||||||
|
&self.suppliers
|
||||||
|
}
|
||||||
|
pub fn peers(&self) -> &[PeerInfo] {
|
||||||
|
&self.peers
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> (Vec<u8>, Vec<PeerInfo>, Vec<PeerInfo>) {
|
||||||
|
(self.data, self.suppliers, self.peers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,17 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
const MAX_FIND_NODE_A_PEERS_LEN: usize = 20;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationFindNodeQ {
|
pub struct RPCOperationFindNodeQ {
|
||||||
pub node_id: TypedKey,
|
node_id: TypedKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationFindNodeQ {
|
impl RPCOperationFindNodeQ {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn new(node_id: TypedKey) -> Self {
|
||||||
|
Self { node_id }
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
@ -24,24 +29,43 @@ impl RPCOperationFindNodeQ {
|
|||||||
encode_typed_key(&self.node_id, &mut ni_builder);
|
encode_typed_key(&self.node_id, &mut ni_builder);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn node_id(&self) -> &TypedKey {
|
||||||
|
&self.node_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> TypedKey {
|
||||||
|
self.node_id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationFindNodeA {
|
pub struct RPCOperationFindNodeA {
|
||||||
pub peers: Vec<PeerInfo>,
|
peers: Vec<PeerInfo>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationFindNodeA {
|
impl RPCOperationFindNodeA {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn new(peers: Vec<PeerInfo>) -> Result<Self, RPCError> {
|
||||||
for pi in &self.peers {
|
if peers.len() > MAX_FIND_NODE_A_PEERS_LEN {
|
||||||
pi.validate(crypto.clone()).map_err(RPCError::protocol)?;
|
return Err(RPCError::protocol("find node peers length too long"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(Self { peers })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_find_node_a::Reader,
|
reader: &veilid_capnp::operation_find_node_a::Reader,
|
||||||
) -> Result<RPCOperationFindNodeA, RPCError> {
|
) -> Result<RPCOperationFindNodeA, RPCError> {
|
||||||
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
|
let peers_reader = reader.get_peers().map_err(RPCError::protocol)?;
|
||||||
|
|
||||||
|
if peers_reader.len() as usize > MAX_FIND_NODE_A_PEERS_LEN {
|
||||||
|
return Err(RPCError::protocol("find node peers length too long"));
|
||||||
|
}
|
||||||
|
|
||||||
let mut peers = Vec::<PeerInfo>::with_capacity(
|
let mut peers = Vec::<PeerInfo>::with_capacity(
|
||||||
peers_reader
|
peers_reader
|
||||||
.len()
|
.len()
|
||||||
@ -53,7 +77,7 @@ impl RPCOperationFindNodeA {
|
|||||||
peers.push(peer_info);
|
peers.push(peer_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(RPCOperationFindNodeA { peers })
|
RPCOperationFindNodeA::new(peers)
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -71,4 +95,12 @@ impl RPCOperationFindNodeA {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn peers(&self) -> &[PeerInfo] {
|
||||||
|
&self.peers
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> Vec<PeerInfo> {
|
||||||
|
self.peers
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,44 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::storage_manager::{SignedValueDescriptor, ValueDetail};
|
||||||
|
|
||||||
|
const MAX_GET_VALUE_A_PEERS_LEN: usize = 20;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ValidateGetValueContext {
|
||||||
|
last_descriptor: Option<SignedValueDescriptor>,
|
||||||
|
subkey: ValueSubkey,
|
||||||
|
vcrypto: CryptoSystemVersion,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for ValidateGetValueContext {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("ValidateGetValueContext")
|
||||||
|
.field("last_descriptor", &self.last_descriptor)
|
||||||
|
.field("subkey", &self.subkey)
|
||||||
|
.field("vcrypto", &self.vcrypto.kind().to_string())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationGetValueQ {
|
pub struct RPCOperationGetValueQ {
|
||||||
pub key: TypedKey,
|
key: TypedKey,
|
||||||
pub subkey: ValueSubkey,
|
subkey: ValueSubkey,
|
||||||
pub want_descriptor: bool,
|
want_descriptor: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationGetValueQ {
|
impl RPCOperationGetValueQ {
|
||||||
|
pub fn new(key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
key,
|
||||||
|
subkey,
|
||||||
|
want_descriptor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_get_value_q::Reader,
|
reader: &veilid_capnp::operation_get_value_q::Reader,
|
||||||
) -> Result<RPCOperationGetValueQ, RPCError> {
|
) -> Result<RPCOperationGetValueQ, RPCError> {
|
||||||
@ -31,6 +62,18 @@ impl RPCOperationGetValueQ {
|
|||||||
builder.set_want_descriptor(self.want_descriptor);
|
builder.set_want_descriptor(self.want_descriptor);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub fn key(&self) -> &TypedKey {
|
||||||
|
&self.key
|
||||||
|
}
|
||||||
|
pub fn subkey(&self) -> ValueSubkey {
|
||||||
|
self.subkey
|
||||||
|
}
|
||||||
|
pub fn want_descriptor(&self) -> bool {
|
||||||
|
self.want_descriptor
|
||||||
|
}
|
||||||
|
pub fn destructure(self) -> (TypedKey, ValueSubkey, bool) {
|
||||||
|
(self.key, self.subkey, self.want_descriptor)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -40,16 +83,53 @@ pub enum RPCOperationGetValueA {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationGetValueA {
|
impl RPCOperationGetValueA {
|
||||||
|
pub fn new_value(value: ValueDetail) -> Self {
|
||||||
|
Self::Value(value)
|
||||||
|
}
|
||||||
|
pub fn new_peers(peers: Vec<PeerInfo>) -> Result<Self, RPCError> {
|
||||||
|
if peers.len() > MAX_GET_VALUE_A_PEERS_LEN {
|
||||||
|
return Err(RPCError::protocol("GetValueA peers length too long"));
|
||||||
|
}
|
||||||
|
Ok(Self::Peers(peers))
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
match self {
|
||||||
|
RPCOperationGetValueA::Value(value_detail) => {
|
||||||
|
let question_context = validate_context
|
||||||
|
.question_context
|
||||||
|
.as_ref()
|
||||||
|
.expect("GetValueA requires question context");
|
||||||
|
let QuestionContext::GetValue(get_value_context) = question_context else {
|
||||||
|
panic!("Wrong context type for GetValueA");
|
||||||
|
};
|
||||||
|
value_detail
|
||||||
|
.validate(
|
||||||
|
get_value_context.last_descriptor.as_ref(),
|
||||||
|
get_value_context.subkey,
|
||||||
|
get_value_context.vcrypto.clone(),
|
||||||
|
)
|
||||||
|
.map_err(RPCError::protocol)
|
||||||
|
}
|
||||||
|
RPCOperationGetValueA::Peers(peers) => {
|
||||||
|
PeerInfo::validate_vec(peers, validate_context.crypto.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_get_value_a::Reader,
|
reader: &veilid_capnp::operation_get_value_a::Reader,
|
||||||
) -> Result<RPCOperationGetValueA, RPCError> {
|
) -> Result<RPCOperationGetValueA, RPCError> {
|
||||||
match reader.which().map_err(RPCError::protocol)? {
|
match reader.which().map_err(RPCError::protocol)? {
|
||||||
veilid_capnp::operation_get_value_a::Which::Value(r) => {
|
veilid_capnp::operation_get_value_a::Which::Value(r) => {
|
||||||
let value_detail = decode_value_detail(&r.map_err(RPCError::protocol)?)?;
|
let value_detail = decode_value_detail(&r.map_err(RPCError::protocol)?)?;
|
||||||
Ok(RPCOperationGetValueA::Data(data))
|
Ok(RPCOperationGetValueA::Value(value_detail))
|
||||||
}
|
}
|
||||||
veilid_capnp::operation_get_value_a::Which::Peers(r) => {
|
veilid_capnp::operation_get_value_a::Which::Peers(r) => {
|
||||||
let peers_reader = r.map_err(RPCError::protocol)?;
|
let peers_reader = r.map_err(RPCError::protocol)?;
|
||||||
|
if peers_reader.len() as usize > MAX_GET_VALUE_A_PEERS_LEN {
|
||||||
|
return Err(RPCError::protocol("GetValueA peers length too long"));
|
||||||
|
}
|
||||||
let mut peers = Vec::<PeerInfo>::with_capacity(
|
let mut peers = Vec::<PeerInfo>::with_capacity(
|
||||||
peers_reader
|
peers_reader
|
||||||
.len()
|
.len()
|
||||||
@ -70,9 +150,9 @@ impl RPCOperationGetValueA {
|
|||||||
builder: &mut veilid_capnp::operation_get_value_a::Builder,
|
builder: &mut veilid_capnp::operation_get_value_a::Builder,
|
||||||
) -> Result<(), RPCError> {
|
) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RPCOperationGetValueA::Data(data) => {
|
RPCOperationGetValueA::Value(value_detail) => {
|
||||||
let mut d_builder = builder.reborrow().init_data();
|
let mut d_builder = builder.reborrow().init_value();
|
||||||
encode_value_data(&data, &mut d_builder)?;
|
encode_value_detail(&value_detail, &mut d_builder)?;
|
||||||
}
|
}
|
||||||
RPCOperationGetValueA::Peers(peers) => {
|
RPCOperationGetValueA::Peers(peers) => {
|
||||||
let mut peers_builder = builder.reborrow().init_peers(
|
let mut peers_builder = builder.reborrow().init_peers(
|
||||||
|
@ -2,17 +2,31 @@ use super::*;
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationReturnReceipt {
|
pub struct RPCOperationReturnReceipt {
|
||||||
pub receipt: Vec<u8>,
|
receipt: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationReturnReceipt {
|
impl RPCOperationReturnReceipt {
|
||||||
|
pub fn new(receipt: &[u8]) -> Result<Self, RPCError> {
|
||||||
|
if receipt.len() < MIN_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol("ReturnReceipt receipt too short to set"));
|
||||||
|
}
|
||||||
|
if receipt.len() > MAX_RECEIPT_SIZE {
|
||||||
|
return Err(RPCError::protocol("ReturnReceipt receipt too long to set"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
receipt: receipt.to_vec(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_return_receipt::Reader,
|
reader: &veilid_capnp::operation_return_receipt::Reader,
|
||||||
) -> Result<RPCOperationReturnReceipt, RPCError> {
|
) -> Result<RPCOperationReturnReceipt, RPCError> {
|
||||||
let rcpt_reader = reader.get_receipt().map_err(RPCError::protocol)?;
|
let rr = reader.get_receipt().map_err(RPCError::protocol)?;
|
||||||
let receipt = rcpt_reader.to_vec();
|
RPCOperationReturnReceipt::new(rr)
|
||||||
|
|
||||||
Ok(RPCOperationReturnReceipt { receipt })
|
|
||||||
}
|
}
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&self,
|
&self,
|
||||||
@ -21,4 +35,12 @@ impl RPCOperationReturnReceipt {
|
|||||||
builder.set_receipt(&self.receipt);
|
builder.set_receipt(&self.receipt);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn receipt(&self) -> &[u8] {
|
||||||
|
&self.receipt
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destructure(self) -> Vec<u8> {
|
||||||
|
self.receipt
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,16 @@ use super::*;
|
|||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RPCOperationSignal {
|
pub struct RPCOperationSignal {
|
||||||
pub signal_info: SignalInfo,
|
signal_info: SignalInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationSignal {
|
impl RPCOperationSignal {
|
||||||
|
pub fn new(signal_info: SignalInfo) -> Self {
|
||||||
|
Self { signal_info }
|
||||||
|
}
|
||||||
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
|
self.signal_info.validate(validate_context.crypto.clone())
|
||||||
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_signal::Reader,
|
reader: &veilid_capnp::operation_signal::Reader,
|
||||||
) -> Result<RPCOperationSignal, RPCError> {
|
) -> Result<RPCOperationSignal, RPCError> {
|
||||||
@ -19,4 +25,10 @@ impl RPCOperationSignal {
|
|||||||
encode_signal_info(&self.signal_info, builder)?;
|
encode_signal_info(&self.signal_info, builder)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
pub fn signal_info(&self) -> &SignalInfo {
|
||||||
|
&self.signal_info
|
||||||
|
}
|
||||||
|
pub fn destructure(self) -> SignalInfo {
|
||||||
|
self.signal_info
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ pub struct RPCOperationStatusQ {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationStatusQ {
|
impl RPCOperationStatusQ {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,7 +41,7 @@ pub struct RPCOperationStatusA {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationStatusA {
|
impl RPCOperationStatusA {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
|
@ -34,7 +34,7 @@ pub enum RPCOperationSupplyBlockA {
|
|||||||
impl RPCOperationSupplyBlockA {
|
impl RPCOperationSupplyBlockA {
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
reader: &veilid_capnp::operation_supply_block_a::Reader,
|
reader: &veilid_capnp::operation_supply_block_a::Reader,
|
||||||
= ) -> Result<RPCOperationSupplyBlockA, RPCError> {
|
) -> Result<RPCOperationSupplyBlockA, RPCError> {
|
||||||
match reader.which().map_err(RPCError::protocol)? {
|
match reader.which().map_err(RPCError::protocol)? {
|
||||||
veilid_capnp::operation_supply_block_a::Which::Expiration(r) => {
|
veilid_capnp::operation_supply_block_a::Which::Expiration(r) => {
|
||||||
Ok(RPCOperationSupplyBlockA::Expiration(r))
|
Ok(RPCOperationSupplyBlockA::Expiration(r))
|
||||||
|
@ -10,9 +10,9 @@ impl RPCQuestion {
|
|||||||
pub fn new(respond_to: RespondTo, detail: RPCQuestionDetail) -> Self {
|
pub fn new(respond_to: RespondTo, detail: RPCQuestionDetail) -> Self {
|
||||||
Self { respond_to, detail }
|
Self { respond_to, detail }
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
self.respond_to.validate(crypto.clone())?;
|
self.respond_to.validate(validate_context)?;
|
||||||
self.detail.validate(crypto)
|
self.detail.validate(validate_context)
|
||||||
}
|
}
|
||||||
pub fn respond_to(&self) -> &RespondTo {
|
pub fn respond_to(&self) -> &RespondTo {
|
||||||
&self.respond_to
|
&self.respond_to
|
||||||
@ -23,6 +23,9 @@ impl RPCQuestion {
|
|||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
|
pub fn destructure(self) -> (RespondTo, RPCQuestionDetail) {
|
||||||
|
(self.respond_to, self.detail)
|
||||||
|
}
|
||||||
pub fn decode(reader: &veilid_capnp::question::Reader) -> Result<RPCQuestion, RPCError> {
|
pub fn decode(reader: &veilid_capnp::question::Reader) -> Result<RPCQuestion, RPCError> {
|
||||||
let rt_reader = reader.get_respond_to();
|
let rt_reader = reader.get_respond_to();
|
||||||
let respond_to = RespondTo::decode(&rt_reader)?;
|
let respond_to = RespondTo::decode(&rt_reader)?;
|
||||||
@ -69,19 +72,19 @@ impl RPCQuestionDetail {
|
|||||||
RPCQuestionDetail::CancelTunnelQ(_) => "CancelTunnelQ",
|
RPCQuestionDetail::CancelTunnelQ(_) => "CancelTunnelQ",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RPCQuestionDetail::StatusQ(r) => r.validate(crypto),
|
RPCQuestionDetail::StatusQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::FindNodeQ(r) => r.validate(crypto),
|
RPCQuestionDetail::FindNodeQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::AppCallQ(r) => r.validate(crypto),
|
RPCQuestionDetail::AppCallQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::GetValueQ(r) => r.validate(crypto),
|
RPCQuestionDetail::GetValueQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::SetValueQ(r) => r.validate(crypto),
|
RPCQuestionDetail::SetValueQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::WatchValueQ(r) => r.validate(crypto),
|
RPCQuestionDetail::WatchValueQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::SupplyBlockQ(r) => r.validate(crypto),
|
RPCQuestionDetail::SupplyBlockQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::FindBlockQ(r) => r.validate(crypto),
|
RPCQuestionDetail::FindBlockQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::StartTunnelQ(r) => r.validate(crypto),
|
RPCQuestionDetail::StartTunnelQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::CompleteTunnelQ(r) => r.validate(crypto),
|
RPCQuestionDetail::CompleteTunnelQ(r) => r.validate(validate_context),
|
||||||
RPCQuestionDetail::CancelTunnelQ(r) => r.validate(crypto),
|
RPCQuestionDetail::CancelTunnelQ(r) => r.validate(validate_context),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,10 +7,12 @@ pub enum RespondTo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RespondTo {
|
impl RespondTo {
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RespondTo::Sender => Ok(()),
|
RespondTo::Sender => Ok(()),
|
||||||
RespondTo::PrivateRoute(pr) => pr.validate(crypto).map_err(RPCError::protocol),
|
RespondTo::PrivateRoute(pr) => pr
|
||||||
|
.validate(validate_context.crypto.clone())
|
||||||
|
.map_err(RPCError::protocol),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,18 +9,18 @@ impl RPCStatement {
|
|||||||
pub fn new(detail: RPCStatementDetail) -> Self {
|
pub fn new(detail: RPCStatementDetail) -> Self {
|
||||||
Self { detail }
|
Self { detail }
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
self.detail.validate(crypto)
|
self.detail.validate(validate_context)
|
||||||
}
|
}
|
||||||
pub fn detail(&self) -> &RPCStatementDetail {
|
pub fn detail(&self) -> &RPCStatementDetail {
|
||||||
&self.detail
|
&self.detail
|
||||||
}
|
}
|
||||||
pub fn into_detail(self) -> RPCStatementDetail {
|
|
||||||
self.detail
|
|
||||||
}
|
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
|
pub fn destructure(self) -> RPCStatementDetail {
|
||||||
|
self.detail
|
||||||
|
}
|
||||||
pub fn decode(reader: &veilid_capnp::statement::Reader) -> Result<RPCStatement, RPCError> {
|
pub fn decode(reader: &veilid_capnp::statement::Reader) -> Result<RPCStatement, RPCError> {
|
||||||
let d_reader = reader.get_detail();
|
let d_reader = reader.get_detail();
|
||||||
let detail = RPCStatementDetail::decode(&d_reader)?;
|
let detail = RPCStatementDetail::decode(&d_reader)?;
|
||||||
@ -53,14 +53,14 @@ impl RPCStatementDetail {
|
|||||||
RPCStatementDetail::AppMessage(_) => "AppMessage",
|
RPCStatementDetail::AppMessage(_) => "AppMessage",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
match self {
|
match self {
|
||||||
RPCStatementDetail::ValidateDialInfo(r) => r.validate(crypto),
|
RPCStatementDetail::ValidateDialInfo(r) => r.validate(validate_context),
|
||||||
RPCStatementDetail::Route(r) => r.validate(crypto),
|
RPCStatementDetail::Route(r) => r.validate(validate_context),
|
||||||
RPCStatementDetail::ValueChanged(r) => r.validate(crypto),
|
RPCStatementDetail::ValueChanged(r) => r.validate(validate_context),
|
||||||
RPCStatementDetail::Signal(r) => r.validate(crypto),
|
RPCStatementDetail::Signal(r) => r.validate(validate_context),
|
||||||
RPCStatementDetail::ReturnReceipt(r) => r.validate(crypto),
|
RPCStatementDetail::ReturnReceipt(r) => r.validate(validate_context),
|
||||||
RPCStatementDetail::AppMessage(r) => r.validate(crypto),
|
RPCStatementDetail::AppMessage(r) => r.validate(validate_context),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn decode(
|
pub fn decode(
|
||||||
|
@ -149,7 +149,7 @@ where
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct WaitableReply {
|
struct WaitableReply {
|
||||||
handle: OperationWaitHandle<RPCMessage>,
|
handle: OperationWaitHandle<RPCMessage, Option<QuestionContext>>,
|
||||||
timeout_us: TimestampDuration,
|
timeout_us: TimestampDuration,
|
||||||
node_ref: NodeRef,
|
node_ref: NodeRef,
|
||||||
send_ts: Timestamp,
|
send_ts: Timestamp,
|
||||||
@ -235,8 +235,8 @@ pub struct RPCProcessorUnlockedInner {
|
|||||||
max_route_hop_count: usize,
|
max_route_hop_count: usize,
|
||||||
validate_dial_info_receipt_time_ms: u32,
|
validate_dial_info_receipt_time_ms: u32,
|
||||||
update_callback: UpdateCallback,
|
update_callback: UpdateCallback,
|
||||||
waiting_rpc_table: OperationWaiter<RPCMessage>,
|
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>,
|
||||||
waiting_app_call_table: OperationWaiter<Vec<u8>>,
|
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@ -998,11 +998,13 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Issue a question over the network, possibly using an anonymized route
|
/// Issue a question over the network, possibly using an anonymized route
|
||||||
|
/// Optionally keeps a context to be passed to the answer processor when an answer is received
|
||||||
#[instrument(level = "debug", skip(self, question), err)]
|
#[instrument(level = "debug", skip(self, question), err)]
|
||||||
async fn question(
|
async fn question(
|
||||||
&self,
|
&self,
|
||||||
dest: Destination,
|
dest: Destination,
|
||||||
question: RPCQuestion,
|
question: RPCQuestion,
|
||||||
|
context: Option<QuestionContext>,
|
||||||
) -> Result<NetworkResult<WaitableReply>, RPCError> {
|
) -> Result<NetworkResult<WaitableReply>, RPCError> {
|
||||||
// Get sender peer info if we should send that
|
// Get sender peer info if we should send that
|
||||||
let spi = self.get_sender_peer_info(&dest);
|
let spi = self.get_sender_peer_info(&dest);
|
||||||
@ -1030,7 +1032,10 @@ impl RPCProcessor {
|
|||||||
let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64);
|
let timeout_us = self.unlocked_inner.timeout_us * (hop_count as u64);
|
||||||
|
|
||||||
// Set up op id eventual
|
// Set up op id eventual
|
||||||
let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id);
|
let handle = self
|
||||||
|
.unlocked_inner
|
||||||
|
.waiting_rpc_table
|
||||||
|
.add_op_waiter(op_id, context);
|
||||||
|
|
||||||
// Send question
|
// Send question
|
||||||
let bytes: ByteCount = (message.len() as u64).into();
|
let bytes: ByteCount = (message.len() as u64).into();
|
||||||
@ -1189,6 +1194,48 @@ impl RPCProcessor {
|
|||||||
Ok(NetworkResult::value(()))
|
Ok(NetworkResult::value(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn decode_rpc_operation(
|
||||||
|
&self,
|
||||||
|
encoded_msg: &RPCMessageEncoded,
|
||||||
|
) -> Result<RPCOperation, RPCError> {
|
||||||
|
let reader = encoded_msg.data.get_reader()?;
|
||||||
|
let op_reader = reader
|
||||||
|
.get_root::<veilid_capnp::operation::Reader>()
|
||||||
|
.map_err(RPCError::protocol)
|
||||||
|
.map_err(logthru_rpc!())?;
|
||||||
|
let decode_context = DecodeContext {
|
||||||
|
config: self.config.clone(),
|
||||||
|
};
|
||||||
|
let operation = RPCOperation::decode(&decode_context, &op_reader)?;
|
||||||
|
|
||||||
|
// Validate the RPC message
|
||||||
|
self.validate_rpc_operation(&operation)?;
|
||||||
|
|
||||||
|
Ok(operation)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_rpc_operation(&self, operation: &RPCOperation) -> Result<(), RPCError> {
|
||||||
|
// If this is an answer, get the question context for this answer
|
||||||
|
// If we received an answer for a question we did not ask, this will return an error
|
||||||
|
let question_context = if let RPCOperationKind::Answer(_) = operation.kind() {
|
||||||
|
let op_id = operation.op_id();
|
||||||
|
self.unlocked_inner
|
||||||
|
.waiting_rpc_table
|
||||||
|
.get_op_context(op_id)?
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Validate the RPC operation
|
||||||
|
let validate_context = RPCValidateContext {
|
||||||
|
crypto: self.crypto.clone(),
|
||||||
|
question_context,
|
||||||
|
};
|
||||||
|
operation.validate(&validate_context)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////
|
||||||
#[instrument(level = "trace", skip(self, encoded_msg), err)]
|
#[instrument(level = "trace", skip(self, encoded_msg), err)]
|
||||||
async fn process_rpc_message(
|
async fn process_rpc_message(
|
||||||
@ -1198,29 +1245,17 @@ impl RPCProcessor {
|
|||||||
// Decode operation appropriately based on header detail
|
// Decode operation appropriately based on header detail
|
||||||
let msg = match &encoded_msg.header.detail {
|
let msg = match &encoded_msg.header.detail {
|
||||||
RPCMessageHeaderDetail::Direct(detail) => {
|
RPCMessageHeaderDetail::Direct(detail) => {
|
||||||
|
// Decode and validate the RPC operation
|
||||||
|
let operation = self.decode_rpc_operation(&encoded_msg)?;
|
||||||
|
|
||||||
// Get the routing domain this message came over
|
// Get the routing domain this message came over
|
||||||
let routing_domain = detail.routing_domain;
|
let routing_domain = detail.routing_domain;
|
||||||
|
|
||||||
// Decode the operation
|
// Get the sender noderef, incorporating sender's peer info
|
||||||
let sender_node_id = TypedKey::new(
|
let sender_node_id = TypedKey::new(
|
||||||
detail.envelope.get_crypto_kind(),
|
detail.envelope.get_crypto_kind(),
|
||||||
detail.envelope.get_sender_id(),
|
detail.envelope.get_sender_id(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Decode the RPC message
|
|
||||||
let operation = {
|
|
||||||
let reader = encoded_msg.data.get_reader()?;
|
|
||||||
let op_reader = reader
|
|
||||||
.get_root::<veilid_capnp::operation::Reader>()
|
|
||||||
.map_err(RPCError::protocol)
|
|
||||||
.map_err(logthru_rpc!())?;
|
|
||||||
RPCOperation::decode(&op_reader)?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Validate the RPC operation
|
|
||||||
operation.validate(self.crypto.clone())?;
|
|
||||||
|
|
||||||
// Get the sender noderef, incorporating sender's peer info
|
|
||||||
let mut opt_sender_nr: Option<NodeRef> = None;
|
let mut opt_sender_nr: Option<NodeRef> = None;
|
||||||
if let Some(sender_peer_info) = operation.sender_peer_info() {
|
if let Some(sender_peer_info) = operation.sender_peer_info() {
|
||||||
// Ensure the sender peer info is for the actual sender specified in the envelope
|
// Ensure the sender peer info is for the actual sender specified in the envelope
|
||||||
@ -1257,18 +1292,8 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
|
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
|
||||||
// Decode the RPC message
|
// Decode and validate the RPC operation
|
||||||
let operation = {
|
let operation = self.decode_rpc_operation(&encoded_msg)?;
|
||||||
let reader = encoded_msg.data.get_reader()?;
|
|
||||||
let op_reader = reader
|
|
||||||
.get_root::<veilid_capnp::operation::Reader>()
|
|
||||||
.map_err(RPCError::protocol)
|
|
||||||
.map_err(logthru_rpc!())?;
|
|
||||||
RPCOperation::decode(&op_reader)?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Validate the RPC operation
|
|
||||||
operation.validate(self.crypto.clone())?;
|
|
||||||
|
|
||||||
// Make the RPC message
|
// Make the RPC message
|
||||||
RPCMessage {
|
RPCMessage {
|
||||||
|
@ -1,18 +1,20 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OperationWaitHandle<T>
|
pub struct OperationWaitHandle<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
waiter: OperationWaiter<T>,
|
waiter: OperationWaiter<T, C>,
|
||||||
op_id: OperationId,
|
op_id: OperationId,
|
||||||
eventual_instance: Option<EventualValueFuture<(Option<Id>, T)>>,
|
eventual_instance: Option<EventualValueFuture<(Option<Id>, T)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Drop for OperationWaitHandle<T>
|
impl<T, C> Drop for OperationWaitHandle<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.eventual_instance.is_some() {
|
if self.eventual_instance.is_some() {
|
||||||
@ -22,24 +24,37 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OperationWaiterInner<T>
|
pub struct OperationWaitingOp<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
waiting_op_table: HashMap<OperationId, EventualValue<(Option<Id>, T)>>,
|
context: C,
|
||||||
|
eventual: EventualValue<(Option<Id>, T)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct OperationWaiter<T>
|
pub struct OperationWaiterInner<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
inner: Arc<Mutex<OperationWaiterInner<T>>>,
|
waiting_op_table: HashMap<OperationId, OperationWaitingOp<T, C>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for OperationWaiter<T>
|
#[derive(Debug)]
|
||||||
|
pub struct OperationWaiter<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
|
{
|
||||||
|
inner: Arc<Mutex<OperationWaiterInner<T, C>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, C> Clone for OperationWaiter<T, C>
|
||||||
|
where
|
||||||
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -48,9 +63,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> OperationWaiter<T>
|
impl<T, C> OperationWaiter<T, C>
|
||||||
where
|
where
|
||||||
T: Unpin,
|
T: Unpin,
|
||||||
|
C: Unpin + Clone,
|
||||||
{
|
{
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -60,11 +76,15 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set up wait for op
|
/// Set up wait for operation to complete
|
||||||
pub fn add_op_waiter(&self, op_id: OperationId) -> OperationWaitHandle<T> {
|
pub fn add_op_waiter(&self, op_id: OperationId, context: C) -> OperationWaitHandle<T, C> {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
let e = EventualValue::new();
|
let e = EventualValue::new();
|
||||||
if inner.waiting_op_table.insert(op_id, e.clone()).is_some() {
|
let waiting_op = OperationWaitingOp {
|
||||||
|
context,
|
||||||
|
eventual: e.clone(),
|
||||||
|
};
|
||||||
|
if inner.waiting_op_table.insert(op_id, waiting_op).is_some() {
|
||||||
error!(
|
error!(
|
||||||
"add_op_waiter collision should not happen for op_id {}",
|
"add_op_waiter collision should not happen for op_id {}",
|
||||||
op_id
|
op_id
|
||||||
@ -78,16 +98,25 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove wait for op
|
/// Get operation context
|
||||||
|
pub fn get_op_context(&self, op_id: OperationId) -> Result<C, RPCError> {
|
||||||
|
let mut inner = self.inner.lock();
|
||||||
|
let Some(waiting_op) = inner.waiting_op_table.get(&op_id) else {
|
||||||
|
return Err(RPCError::internal("Missing operation id getting op context"));
|
||||||
|
};
|
||||||
|
Ok(waiting_op.context.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove wait for op
|
||||||
fn cancel_op_waiter(&self, op_id: OperationId) {
|
fn cancel_op_waiter(&self, op_id: OperationId) {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner.waiting_op_table.remove(&op_id);
|
inner.waiting_op_table.remove(&op_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// complete the app call
|
/// Complete the app call
|
||||||
#[instrument(level = "trace", skip(self, message), err)]
|
#[instrument(level = "trace", skip(self, message), err)]
|
||||||
pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
|
pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> {
|
||||||
let eventual = {
|
let waiting_op = {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner
|
inner
|
||||||
.waiting_op_table
|
.waiting_op_table
|
||||||
@ -97,13 +126,17 @@ where
|
|||||||
op_id
|
op_id
|
||||||
)))?
|
)))?
|
||||||
};
|
};
|
||||||
eventual.resolve((Span::current().id(), message)).await;
|
waiting_op
|
||||||
|
.eventual
|
||||||
|
.resolve((Span::current().id(), message))
|
||||||
|
.await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for opeation to complete
|
||||||
pub async fn wait_for_op(
|
pub async fn wait_for_op(
|
||||||
&self,
|
&self,
|
||||||
mut handle: OperationWaitHandle<T>,
|
mut handle: OperationWaitHandle<T, C>,
|
||||||
timeout_us: TimestampDuration,
|
timeout_us: TimestampDuration,
|
||||||
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
|
) -> Result<TimeoutOr<(T, TimestampDuration)>, RPCError> {
|
||||||
let timeout_ms = u32::try_from(timeout_us.as_u64() / 1000u64)
|
let timeout_ms = u32::try_from(timeout_us.as_u64() / 1000u64)
|
||||||
|
@ -9,14 +9,14 @@ impl RPCProcessor {
|
|||||||
dest: Destination,
|
dest: Destination,
|
||||||
message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
|
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
|
||||||
let app_call_q = RPCOperationAppCallQ { message };
|
let app_call_q = RPCOperationAppCallQ::new(&message)?;
|
||||||
let question = RPCQuestion::new(
|
let question = RPCQuestion::new(
|
||||||
network_result_try!(self.get_destination_respond_to(&dest)?),
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
RPCQuestionDetail::AppCallQ(app_call_q),
|
RPCQuestionDetail::AppCallQ(app_call_q),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send the app call question
|
// Send the app call question
|
||||||
let waitable_reply = network_result_try!(self.question(dest, question).await?);
|
let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
|
||||||
|
|
||||||
// Wait for reply
|
// Wait for reply
|
||||||
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
|
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
|
||||||
@ -25,18 +25,18 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the right answer type
|
// Get the right answer type
|
||||||
let app_call_a = match msg.operation.into_kind() {
|
let (_, _, _, kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Answer(a) => match a.into_detail() {
|
let app_call_a = match kind {
|
||||||
|
RPCOperationKind::Answer(a) => match a.destructure() {
|
||||||
RPCAnswerDetail::AppCallA(a) => a,
|
RPCAnswerDetail::AppCallA(a) => a,
|
||||||
_ => return Err(RPCError::invalid_format("not an appcall answer")),
|
_ => return Err(RPCError::invalid_format("not an appcall answer")),
|
||||||
},
|
},
|
||||||
_ => return Err(RPCError::invalid_format("not an answer")),
|
_ => return Err(RPCError::invalid_format("not an answer")),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(NetworkResult::value(Answer::new(
|
let a_message = app_call_a.destructure();
|
||||||
latency,
|
|
||||||
app_call_a.message,
|
Ok(NetworkResult::value(Answer::new(latency, a_message)))
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
||||||
@ -45,9 +45,10 @@ impl RPCProcessor {
|
|||||||
msg: RPCMessage,
|
msg: RPCMessage,
|
||||||
) -> Result<NetworkResult<()>, RPCError> {
|
) -> Result<NetworkResult<()>, RPCError> {
|
||||||
// Get the question
|
// Get the question
|
||||||
let app_call_q = match msg.operation.kind() {
|
let (op_id, _, _, kind) = msg.operation.clone().destructure();
|
||||||
RPCOperationKind::Question(q) => match q.detail() {
|
let app_call_q = match kind {
|
||||||
RPCQuestionDetail::AppCallQ(q) => q,
|
RPCOperationKind::Question(q) => match q.destructure() {
|
||||||
|
(_, RPCQuestionDetail::AppCallQ(q)) => q,
|
||||||
_ => panic!("not an appcall question"),
|
_ => panic!("not an appcall question"),
|
||||||
},
|
},
|
||||||
_ => panic!("not a question"),
|
_ => panic!("not a question"),
|
||||||
@ -63,16 +64,16 @@ impl RPCProcessor {
|
|||||||
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().value);
|
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().value);
|
||||||
|
|
||||||
// Register a waiter for this app call
|
// Register a waiter for this app call
|
||||||
let id = msg.operation.op_id();
|
let handle = self
|
||||||
let handle = self.unlocked_inner.waiting_app_call_table.add_op_waiter(id);
|
.unlocked_inner
|
||||||
|
.waiting_app_call_table
|
||||||
|
.add_op_waiter(op_id, ());
|
||||||
|
|
||||||
// Pass the call up through the update callback
|
// Pass the call up through the update callback
|
||||||
let message = app_call_q.message.clone();
|
let message_q = app_call_q.destructure();
|
||||||
(self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall {
|
(self.unlocked_inner.update_callback)(VeilidUpdate::AppCall(VeilidAppCall::new(
|
||||||
sender,
|
sender, message_q, op_id,
|
||||||
message,
|
)));
|
||||||
id,
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Wait for an app call answer to come back from the app
|
// Wait for an app call answer to come back from the app
|
||||||
let res = self
|
let res = self
|
||||||
@ -80,17 +81,17 @@ impl RPCProcessor {
|
|||||||
.waiting_app_call_table
|
.waiting_app_call_table
|
||||||
.wait_for_op(handle, self.unlocked_inner.timeout_us)
|
.wait_for_op(handle, self.unlocked_inner.timeout_us)
|
||||||
.await?;
|
.await?;
|
||||||
let (message, _latency) = match res {
|
let (message_a, _latency) = match res {
|
||||||
TimeoutOr::Timeout => {
|
TimeoutOr::Timeout => {
|
||||||
// No message sent on timeout, but this isn't an error
|
// No message sent on timeout, but this isn't an error
|
||||||
log_rpc!(debug "App call timed out for id {}", id);
|
log_rpc!(debug "App call timed out for id {}", op_id);
|
||||||
return Ok(NetworkResult::timeout());
|
return Ok(NetworkResult::timeout());
|
||||||
}
|
}
|
||||||
TimeoutOr::Value(v) => v,
|
TimeoutOr::Value(v) => v,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Return the appcall answer
|
// Return the appcall answer
|
||||||
let app_call_a = RPCOperationAppCallA { message };
|
let app_call_a = RPCOperationAppCallA::new(&message_a)?;
|
||||||
|
|
||||||
// Send status answer
|
// Send status answer
|
||||||
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
|
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::AppCallA(app_call_a)))
|
||||||
|
@ -22,8 +22,9 @@ impl RPCProcessor {
|
|||||||
msg: RPCMessage,
|
msg: RPCMessage,
|
||||||
) -> Result<NetworkResult<()>, RPCError> {
|
) -> Result<NetworkResult<()>, RPCError> {
|
||||||
// Get the statement
|
// Get the statement
|
||||||
let app_message = match msg.operation.into_kind() {
|
let (op_id, _, _, kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Statement(s) => match s.into_detail() {
|
let app_message = match kind {
|
||||||
|
RPCOperationKind::Statement(s) => match s.destructure() {
|
||||||
RPCStatementDetail::AppMessage(s) => s,
|
RPCStatementDetail::AppMessage(s) => s,
|
||||||
_ => panic!("not an app message"),
|
_ => panic!("not an app message"),
|
||||||
},
|
},
|
||||||
@ -40,11 +41,10 @@ impl RPCProcessor {
|
|||||||
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().value);
|
.map(|nr| nr.node_ids().get(crypto_kind).unwrap().value);
|
||||||
|
|
||||||
// Pass the message up through the update callback
|
// Pass the message up through the update callback
|
||||||
let message = app_message.message;
|
let message = app_message.destructure();
|
||||||
(self.unlocked_inner.update_callback)(VeilidUpdate::AppMessage(VeilidAppMessage {
|
(self.unlocked_inner.update_callback)(VeilidUpdate::AppMessage(VeilidAppMessage::new(
|
||||||
sender,
|
sender, message,
|
||||||
message,
|
)));
|
||||||
}));
|
|
||||||
|
|
||||||
Ok(NetworkResult::value(()))
|
Ok(NetworkResult::value(()))
|
||||||
}
|
}
|
||||||
|
@ -26,14 +26,14 @@ impl RPCProcessor {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let find_node_q_detail = RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ { node_id });
|
let find_node_q_detail = RPCQuestionDetail::FindNodeQ(RPCOperationFindNodeQ::new(node_id));
|
||||||
let find_node_q = RPCQuestion::new(
|
let find_node_q = RPCQuestion::new(
|
||||||
network_result_try!(self.get_destination_respond_to(&dest)?),
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
find_node_q_detail,
|
find_node_q_detail,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send the find_node request
|
// Send the find_node request
|
||||||
let waitable_reply = network_result_try!(self.question(dest, find_node_q).await?);
|
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
|
||||||
|
|
||||||
// Wait for reply
|
// Wait for reply
|
||||||
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
|
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
|
||||||
@ -42,8 +42,9 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the right answer type
|
// Get the right answer type
|
||||||
let find_node_a = match msg.operation.into_kind() {
|
let (_, _, _, kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Answer(a) => match a.into_detail() {
|
let find_node_a = match kind {
|
||||||
|
RPCOperationKind::Answer(a) => match a.destructure() {
|
||||||
RPCAnswerDetail::FindNodeA(a) => a,
|
RPCAnswerDetail::FindNodeA(a) => a,
|
||||||
_ => return Err(RPCError::invalid_format("not a find_node answer")),
|
_ => return Err(RPCError::invalid_format("not a find_node answer")),
|
||||||
},
|
},
|
||||||
@ -51,7 +52,7 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Verify peers are in the correct peer scope
|
// Verify peers are in the correct peer scope
|
||||||
for peer_info in &find_node_a.peers {
|
for peer_info in find_node_a.peers() {
|
||||||
if !self.filter_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) {
|
if !self.filter_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) {
|
||||||
return Err(RPCError::invalid_format(
|
return Err(RPCError::invalid_format(
|
||||||
"find_node response has invalid peer scope",
|
"find_node response has invalid peer scope",
|
||||||
@ -59,10 +60,8 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(NetworkResult::value(Answer::new(
|
let peers = find_node_a.destructure();
|
||||||
latency,
|
Ok(NetworkResult::value(Answer::new(latency, peers)))
|
||||||
find_node_a.peers,
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
||||||
@ -81,9 +80,10 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the question
|
// Get the question
|
||||||
let find_node_q = match msg.operation.kind() {
|
let kind = msg.operation.kind().clone();
|
||||||
RPCOperationKind::Question(q) => match q.detail() {
|
let find_node_q = match kind {
|
||||||
RPCQuestionDetail::FindNodeQ(q) => q,
|
RPCOperationKind::Question(q) => match q.destructure() {
|
||||||
|
(_, RPCQuestionDetail::FindNodeQ(q)) => q,
|
||||||
_ => panic!("not a status question"),
|
_ => panic!("not a status question"),
|
||||||
},
|
},
|
||||||
_ => panic!("not a question"),
|
_ => panic!("not a question"),
|
||||||
@ -114,9 +114,10 @@ impl RPCProcessor {
|
|||||||
c.network.dht.max_find_node_count as usize
|
c.network.dht.max_find_node_count as usize
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let node_id = find_node_q.destructure();
|
||||||
let closest_nodes = routing_table.find_closest_nodes(
|
let closest_nodes = routing_table.find_closest_nodes(
|
||||||
node_count,
|
node_count,
|
||||||
find_node_q.node_id,
|
node_id,
|
||||||
filters,
|
filters,
|
||||||
// transform
|
// transform
|
||||||
|rti, entry| {
|
|rti, entry| {
|
||||||
@ -125,9 +126,7 @@ impl RPCProcessor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Make status answer
|
// Make status answer
|
||||||
let find_node_a = RPCOperationFindNodeA {
|
let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?;
|
||||||
peers: closest_nodes,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Send status answer
|
// Send status answer
|
||||||
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
|
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a)))
|
||||||
|
@ -1,6 +1,62 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::storage_manager::SignedValueDescriptor;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash)]
|
||||||
|
pub enum GetValueAnswer {}
|
||||||
|
|
||||||
impl RPCProcessor {
|
impl RPCProcessor {
|
||||||
|
// Sends a get value request and wait for response
|
||||||
|
// Can be sent via all methods including relays and routes
|
||||||
|
#[instrument(level = "trace", skip(self), ret, err)]
|
||||||
|
pub async fn rpc_call_get_value(
|
||||||
|
self,
|
||||||
|
dest: Destination,
|
||||||
|
key: TypedKey,
|
||||||
|
subkey: ValueSubkey,
|
||||||
|
last_descriptor: Option<SignedValueDescriptor>,
|
||||||
|
) -> Result<NetworkResult<Answer<Vec<u8>>>, RPCError> {
|
||||||
|
let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none());
|
||||||
|
let question = RPCQuestion::new(
|
||||||
|
network_result_try!(self.get_destination_respond_to(&dest)?),
|
||||||
|
RPCQuestionDetail::GetValueQ(get_value_q),
|
||||||
|
);
|
||||||
|
let Some(vcrypto) = self.crypto.get(key.kind) else {
|
||||||
|
return Err(RPCError::internal("unsupported cryptosystem"));
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send the app call question
|
||||||
|
let question_context = QuestionContext::GetValue(ValidateGetValueContext {
|
||||||
|
last_descriptor,
|
||||||
|
subkey,
|
||||||
|
vcrypto,
|
||||||
|
});
|
||||||
|
|
||||||
|
let waitable_reply = network_result_try!(
|
||||||
|
self.question(dest, question, Some(question_context))
|
||||||
|
.await?
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for reply
|
||||||
|
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
|
||||||
|
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
|
||||||
|
TimeoutOr::Value(v) => v,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the right answer type
|
||||||
|
let (_, _, _, kind) = msg.operation.destructure();
|
||||||
|
let app_call_a = match kind {
|
||||||
|
RPCOperationKind::Answer(a) => match a.destructure() {
|
||||||
|
RPCAnswerDetail::AppCallA(a) => a,
|
||||||
|
_ => return Err(RPCError::invalid_format("not an appcall answer")),
|
||||||
|
},
|
||||||
|
_ => return Err(RPCError::invalid_format("not an answer")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let a_message = app_call_a.destructure();
|
||||||
|
|
||||||
|
Ok(NetworkResult::value(Answer::new(latency, a_message)))
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
|
||||||
pub(crate) async fn process_get_value_q(
|
pub(crate) async fn process_get_value_q(
|
||||||
&self,
|
&self,
|
||||||
|
@ -26,8 +26,9 @@ impl RPCProcessor {
|
|||||||
msg: RPCMessage,
|
msg: RPCMessage,
|
||||||
) -> Result<NetworkResult<()>, RPCError> {
|
) -> Result<NetworkResult<()>, RPCError> {
|
||||||
// Get the statement
|
// Get the statement
|
||||||
let RPCOperationReturnReceipt { receipt } = match msg.operation.into_kind() {
|
let (_, _, _, kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Statement(s) => match s.into_detail() {
|
let RPCOperationReturnReceipt { receipt } = match kind {
|
||||||
|
RPCOperationKind::Statement(s) => match s.destructure() {
|
||||||
RPCStatementDetail::ReturnReceipt(s) => s,
|
RPCStatementDetail::ReturnReceipt(s) => s,
|
||||||
_ => panic!("not a return receipt"),
|
_ => panic!("not a return receipt"),
|
||||||
},
|
},
|
||||||
|
@ -377,8 +377,9 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the statement
|
// Get the statement
|
||||||
let mut route = match msg.operation.into_kind() {
|
let (_,_,_,kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Statement(s) => match s.into_detail() {
|
let mut route = match kind {
|
||||||
|
RPCOperationKind::Statement(s) => match s.destructure() {
|
||||||
RPCStatementDetail::Route(s) => s,
|
RPCStatementDetail::Route(s) => s,
|
||||||
_ => panic!("not a route statement"),
|
_ => panic!("not a route statement"),
|
||||||
},
|
},
|
||||||
|
@ -44,8 +44,9 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the statement
|
// Get the statement
|
||||||
let signal = match msg.operation.into_kind() {
|
let (_, _, _, kind) = msg.operation.destructure();
|
||||||
RPCOperationKind::Statement(s) => match s.into_detail() {
|
let signal = match kind {
|
||||||
|
RPCOperationKind::Statement(s) => match s.destructure() {
|
||||||
RPCStatementDetail::Signal(s) => s,
|
RPCStatementDetail::Signal(s) => s,
|
||||||
_ => panic!("not a signal"),
|
_ => panic!("not a signal"),
|
||||||
},
|
},
|
||||||
|
@ -75,7 +75,8 @@ impl RPCProcessor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Send the info request
|
// Send the info request
|
||||||
let waitable_reply = network_result_try!(self.question(dest.clone(), question).await?);
|
let waitable_reply =
|
||||||
|
network_result_try!(self.question(dest.clone(), question, None).await?);
|
||||||
|
|
||||||
// Note what kind of ping this was and to what peer scope
|
// Note what kind of ping this was and to what peer scope
|
||||||
let send_data_kind = waitable_reply.send_data_kind;
|
let send_data_kind = waitable_reply.send_data_kind;
|
||||||
|
@ -27,11 +27,8 @@ pub use routing_table::{NodeRef, NodeRefBase};
|
|||||||
use crate::*;
|
use crate::*;
|
||||||
use core::fmt;
|
use core::fmt;
|
||||||
use core_context::{api_shutdown, VeilidCoreContext};
|
use core_context::{api_shutdown, VeilidCoreContext};
|
||||||
use enumset::*;
|
|
||||||
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
|
||||||
use routing_table::{Direction, RouteSpecStore, RoutingTable};
|
use routing_table::{Direction, RouteSpecStore, RoutingTable};
|
||||||
use rpc_processor::*;
|
use rpc_processor::*;
|
||||||
use serde::*;
|
|
||||||
use storage_manager::StorageManager;
|
use storage_manager::StorageManager;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
pub use bytecheck::CheckBytes;
|
|
||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
use rkyv::Archive as RkyvArchive;
|
|
||||||
use rkyv::Deserialize as RkyvDeserialize;
|
|
||||||
use rkyv::Serialize as RkyvSerialize;
|
|
||||||
|
|
||||||
// Don't trace these functions as they are used in the transfer of API logs, which will recurse!
|
// Don't trace these functions as they are used in the transfer of API logs, which will recurse!
|
||||||
|
|
||||||
@ -146,7 +143,7 @@ pub fn from_rkyv<T>(v: Vec<u8>) -> EyreResult<T>
|
|||||||
where
|
where
|
||||||
T: RkyvArchive,
|
T: RkyvArchive,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
for<'t> bytecheck::CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
|
||||||
<T as RkyvArchive>::Archived:
|
<T as RkyvArchive>::Archived:
|
||||||
rkyv::Deserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
rkyv::Deserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
|
||||||
{
|
{
|
||||||
|
@ -14,6 +14,19 @@ pub struct VeilidAppMessage {
|
|||||||
pub message: Vec<u8>,
|
pub message: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl VeilidAppMessage {
|
||||||
|
pub fn new(sender: Option<PublicKey>, message: Vec<u8>) -> Self {
|
||||||
|
Self { sender, message }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sender(&self) -> Option<&PublicKey> {
|
||||||
|
self.sender.as_ref()
|
||||||
|
}
|
||||||
|
pub fn message(&self) -> &[u8] {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Direct question blob passed to hosting application for processing to send an eventual AppReply
|
/// Direct question blob passed to hosting application for processing to send an eventual AppReply
|
||||||
#[derive(
|
#[derive(
|
||||||
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
|
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
|
||||||
@ -22,11 +35,31 @@ pub struct VeilidAppMessage {
|
|||||||
pub struct VeilidAppCall {
|
pub struct VeilidAppCall {
|
||||||
/// Some(sender) if the request was sent directly, None if received via a private/safety route
|
/// Some(sender) if the request was sent directly, None if received via a private/safety route
|
||||||
#[serde(with = "opt_json_as_string")]
|
#[serde(with = "opt_json_as_string")]
|
||||||
pub sender: Option<PublicKey>,
|
sender: Option<PublicKey>,
|
||||||
/// The content of the request to deliver to the application
|
/// The content of the request to deliver to the application
|
||||||
#[serde(with = "json_as_base64")]
|
#[serde(with = "json_as_base64")]
|
||||||
pub message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
/// The id to reply to
|
/// The id to reply to
|
||||||
#[serde(with = "json_as_string")]
|
#[serde(with = "json_as_string")]
|
||||||
pub id: OperationId,
|
id: OperationId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VeilidAppCall {
|
||||||
|
pub fn new(sender: Option<PublicKey>, message: Vec<u8>, id: OperationId) -> Self {
|
||||||
|
Self {
|
||||||
|
sender,
|
||||||
|
message,
|
||||||
|
id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sender(&self) -> Option<&PublicKey> {
|
||||||
|
self.sender.as_ref()
|
||||||
|
}
|
||||||
|
pub fn message(&self) -> &[u8] {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
pub fn id(&self) -> OperationId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user