From 5fb551e53daa6ef5779e5b1a10869201577bdea8 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Jul 2022 17:47:46 -0400 Subject: [PATCH] checkpoint --- veilid-core/proto/veilid.capnp | 2 +- .../src/rpc_processor/coders/block_id.rs | 49 ++++++++ veilid-core/src/rpc_processor/coders/mod.rs | 2 + .../rpc_processor/coders/operations/mod.rs | 17 ++- .../coders/operations/operation_find_block.rs | 106 ++++++++++++++++++ .../coders/operations/operation_get_value.rs | 69 +++++++----- .../operations/operation_return_receipt.rs | 25 +++++ .../coders/operations/operation_set_value.rs | 89 +++++++++++++++ .../coders/operations/operation_signal.rs | 23 ++++ .../operations/operation_supply_block.rs | 84 ++++++++++++++ .../operations/operation_value_changed.rs | 30 +++++ .../operations/operation_watch_value.rs | 71 ++++++++++++ 12 files changed, 538 insertions(+), 29 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/block_id.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_signal.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index b7421992..00de7bb5 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -325,7 +325,7 @@ struct OperationSupplyBlockA { } struct OperationFindBlockQ { - blockId @0 :BlockID; # hash of the block we can supply + blockId @0 :BlockID; # hash of the block to locate } struct OperationFindBlockA { diff --git a/veilid-core/src/rpc_processor/coders/block_id.rs b/veilid-core/src/rpc_processor/coders/block_id.rs new file mode 100644 index 00000000..5da590db --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/block_id.rs @@ -0,0 +1,49 @@ +use crate::dht::*; +use crate::*; +use core::convert::TryInto; +use rpc_processor::*; + +pub fn decode_block_id(public_key: &veilid_capnp::b_l_a_k_e3_hash::Reader) -> DHTKey { + let u0 = public_key.get_u0().to_be_bytes(); + let u1 = public_key.get_u1().to_be_bytes(); + let u2 = public_key.get_u2().to_be_bytes(); + let u3 = public_key.get_u3().to_be_bytes(); + + let mut x: [u8; 32] = Default::default(); + x[0..8].copy_from_slice(&u0); + x[8..16].copy_from_slice(&u1); + x[16..24].copy_from_slice(&u2); + x[24..32].copy_from_slice(&u3); + + DHTKey::new(x) +} + +pub fn encode_block_id( + key: &DHTKey, + builder: &mut veilid_capnp::b_l_a_k_e3_hash::Builder, +) -> Result<(), RPCError> { + if !key.valid { + return Err(rpc_error_protocol("invalid key")); + } + builder.set_u0(u64::from_be_bytes( + key.bytes[0..8] + .try_into() + .map_err(map_error_protocol!("slice with incorrect length"))?, + )); + builder.set_u1(u64::from_be_bytes( + key.bytes[8..16] + .try_into() + .map_err(map_error_protocol!("slice with incorrect length"))?, + )); + builder.set_u2(u64::from_be_bytes( + key.bytes[16..24] + .try_into() + .map_err(map_error_protocol!("slice with incorrect length"))?, + )); + builder.set_u3(u64::from_be_bytes( + key.bytes[24..32] + .try_into() + .map_err(map_error_protocol!("slice with incorrect length"))?, + )); + Ok(()) +} diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 8f6f4f7c..09d7dbf2 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -1,4 +1,5 @@ mod address; +mod block_id; mod dial_info; mod dial_info_class; mod dial_info_detail; @@ -21,6 +22,7 @@ mod value_data; mod value_key; pub use address::*; +pub use block_id::*; pub use dial_info::*; pub use dial_info_class::*; pub use dial_info_detail::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index a595b9bf..148bc82d 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -1,22 +1,35 @@ mod operation; mod operation_detail; +mod operation_find_block; mod operation_find_node; mod operation_get_value; mod operation_node_info_update; +mod operation_return_receipt; mod operation_route; +mod operation_set_value; +mod operation_signal; mod operation_status; +mod operation_supply_block; mod operation_validate_dial_info; -mod respond_to; +mod operation_value_changed; +mod operation_watch_value; -use super::*; +mod respond_to; pub use operation::*; pub use operation_detail::*; +pub use operation_find_block::*; pub use operation_find_node::*; pub use operation_get_value::*; pub use operation_node_info_update::*; +pub use operation_return_receipt::*; pub use operation_route::*; +pub use operation_set_value::*; +pub use operation_signal::*; pub use operation_status::*; +pub use operation_supply_block::*; pub use operation_validate_dial_info::*; +pub use operation_value_changed::*; +pub use operation_watch_value::*; pub use respond_to::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs new file mode 100644 index 00000000..b6fab81c --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs @@ -0,0 +1,106 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationFindBlockQ { + block_id: DHTKey, +} + +impl RPCOperationFindBlockQ { + pub fn decode( + reader: &veilid_capnp::operation_find_block_q::Reader, + ) -> Result { + let bi_reader = reader.get_block_id().map_err(map_error_capnp_error!())?; + let block_id = decode_block_id(&bi_reader); + + Ok(RPCOperationFindBlockQ { block_id }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_find_block_q::Builder, + ) -> Result<(), RPCError> { + let bi_builder = builder.init_block_id(); + encode_block_id(&self.block_id, &mut bi_builder)?; + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationFindBlockA { + data: Vec, + suppliers: Vec, + peers: Vec, +} + +impl RPCOperationFindBlockA { + pub fn decode( + reader: &veilid_capnp::operation_find_block_a::Reader, + ) -> Result { + let data = reader + .get_data() + .map_err(map_error_capnp_error!())? + .to_vec(); + + let suppliers_reader = reader.get_suppliers().map_err(map_error_capnp_error!())?; + let mut suppliers = Vec::::with_capacity( + suppliers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many suppliers"))?, + ); + for s in suppliers_reader.iter() { + let peer_info = decode_peer_info(&s, true)?; + suppliers.push(peer_info); + } + + let peers_reader = reader.get_peers().map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationFindBlockA { + data, + suppliers, + peers, + }) + } + + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_find_block_a::Builder, + ) -> Result<(), RPCError> { + builder.set_data(&self.data); + + let mut suppliers_builder = builder.init_suppliers( + self.suppliers + .len() + .try_into() + .map_err(map_error_internal!("invalid suppliers list length"))?, + ); + for (i, peer) in self.suppliers.iter().enumerate() { + let mut pi_builder = suppliers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + + let mut peers_builder = builder.init_peers( + self.peers + .len() + .try_into() + .map_err(map_error_internal!("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index e9a62e2b..92070adb 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -10,16 +10,16 @@ impl RPCOperationGetValueQ { pub fn decode( reader: &veilid_capnp::operation_get_value_q::Reader, ) -> Result { - let ni_reader = reader.get_node_id().map_err(map_error_capnp_error!())?; - let node_id = decode_public_key(&ni_reader); - Ok(RPCOperationGetValueQ { node_id }) + let k_reader = reader.get_key().map_err(map_error_capnp_error!())?; + let key = decode_value_key(&k_reader)?; + Ok(RPCOperationGetValueQ { key }) } pub fn encode( &self, builder: &mut veilid_capnp::operation_get_value_q::Builder, ) -> Result<(), RPCError> { - let ni_builder = builder.init_node_id(); - encode_public_key(&self.node_id, &mut ni_builder)?; + let k_builder = builder.init_key(); + encode_value_key(&self.key, &mut k_builder)?; Ok(()) } } @@ -34,34 +34,51 @@ impl RPCOperationGetValueA { pub fn decode( reader: &veilid_capnp::operation_get_value_a::Reader, ) -> Result { - let peers_reader = reader.get_peers().map_err(map_error_capnp_error!())?; - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(map_error_internal!("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p, true)?; - peers.push(peer_info); - } + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_get_value_a::Which::Data(r) => { + let data = decode_value_data(&r.map_err(map_error_capnp_error!())?)?; + Ok(RPCOperationGetValueA::Data(data)) + } + veilid_capnp::operation_get_value_a::Which::Peers(r) => { + let peers_reader = r.map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } - Ok(RPCOperationGetValueA { peers }) + Ok(RPCOperationGetValueA::Peers(peers)) + } + } } pub fn encode( &self, builder: &mut veilid_capnp::operation_get_value_a::Builder, ) -> Result<(), RPCError> { - let mut peers_builder = builder.init_peers( - self.peers - .len() - .try_into() - .map_err(map_error_internal!("invalid closest nodes list length"))?, - ); - for (i, peer) in self.peers.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(peer, &mut pi_builder)?; + match self { + RPCOperationGetValueA::Data(data) => { + let d_builder = builder.init_data(); + encode_value_data(&data, &mut d_builder)?; + } + RPCOperationGetValueA::Peers(peers) => { + let mut peers_builder = builder.init_peers( + peers + .len() + .try_into() + .map_err(map_error_internal!("invalid peers list length"))?, + ); + for (i, peer) in peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + } } + Ok(()) } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs new file mode 100644 index 00000000..9e93cf29 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs @@ -0,0 +1,25 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationReturnReceipt { + receipt: Vec, +} + +impl RPCOperationReturnReceipt { + pub fn decode( + reader: &veilid_capnp::operation_return_receipt::Reader, + ) -> Result { + let rcpt_reader = reader.get_receipt().map_err(map_error_capnp_error!())?; + let receipt = rcpt_reader.to_vec(); + + Ok(RPCOperationReturnReceipt { receipt }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_return_receipt::Builder, + ) -> Result<(), RPCError> { + builder.set_receipt(&self.receipt); + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs new file mode 100644 index 00000000..9b2494f3 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -0,0 +1,89 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationSetValueQ { + key: ValueKey, + value: ValueData, +} + +impl RPCOperationSetValueQ { + pub fn decode( + reader: &veilid_capnp::operation_set_value_q::Reader, + ) -> Result { + let k_reader = reader.get_key().map_err(map_error_capnp_error!())?; + let key = decode_value_key(&k_reader)?; + let v_reader = reader.get_value().map_err(map_error_capnp_error!())?; + let value = decode_value_data(&v_reader)?; + Ok(RPCOperationSetValueQ { key, value }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_set_value_q::Builder, + ) -> Result<(), RPCError> { + let k_builder = builder.init_key(); + encode_value_key(&self.key, &mut k_builder)?; + let v_builder = builder.init_value(); + encode_value_data(&self.value, &mut v_builder)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationSetValueA { + Data(ValueData), + Peers(Vec), +} + +impl RPCOperationSetValueA { + pub fn decode( + reader: &veilid_capnp::operation_set_value_a::Reader, + ) -> Result { + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_set_value_a::Which::Data(r) => { + let data = decode_value_data(&r.map_err(map_error_capnp_error!())?)?; + Ok(RPCOperationSetValueA::Data(data)) + } + veilid_capnp::operation_set_value_a::Which::Peers(r) => { + let peers_reader = r.map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationSetValueA::Peers(peers)) + } + } + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_set_value_a::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationSetValueA::Data(data) => { + let d_builder = builder.init_data(); + encode_value_data(&data, &mut d_builder)?; + } + RPCOperationSetValueA::Peers(peers) => { + let mut peers_builder = builder.init_peers( + peers + .len() + .try_into() + .map_err(map_error_internal!("invalid peers list length"))?, + ); + for (i, peer) in peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs new file mode 100644 index 00000000..43c47aff --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs @@ -0,0 +1,23 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationSignal { + signal_info: SignalInfo, +} + +impl RPCOperationSignal { + pub fn decode( + reader: &veilid_capnp::operation_signal::Reader, + ) -> Result { + let signal_info = decode_signal_info(reader)?; + Ok(RPCOperationSignal { signal_info }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_signal::Builder, + ) -> Result<(), RPCError> { + encode_signal_info(&self.signal_info, &mut builder)?; + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs new file mode 100644 index 00000000..8b3b8191 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs @@ -0,0 +1,84 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationSupplyBlockQ { + block_id: DHTKey, +} + +impl RPCOperationSupplyBlockQ { + pub fn decode( + reader: &veilid_capnp::operation_supply_block_q::Reader, + ) -> Result { + let bi_reader = reader.get_block_id().map_err(map_error_capnp_error!())?; + let block_id = decode_block_id(&bi_reader); + + Ok(RPCOperationSupplyBlockQ { block_id }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_supply_block_q::Builder, + ) -> Result<(), RPCError> { + let bi_builder = builder.init_block_id(); + encode_block_id(&self.block_id, &mut bi_builder)?; + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationSupplyBlockA { + Expiration(u64), + Peers(Vec), +} + +impl RPCOperationSupplyBlockA { + pub fn decode( + reader: &veilid_capnp::operation_supply_block_a::Reader, + ) -> Result { + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_supply_block_a::Which::Expiration(r) => { + Ok(RPCOperationSupplyBlockA::Expiration(r)) + } + veilid_capnp::operation_supply_block_a::Which::Peers(r) => { + let peers_reader = r.map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationSupplyBlockA::Peers(peers)) + } + } + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_supply_block_a::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationSupplyBlockA::Expiration(e) => { + builder.set_expiration(*e); + } + RPCOperationSupplyBlockA::Peers(peers) => { + let mut peers_builder = builder.init_peers( + peers + .len() + .try_into() + .map_err(map_error_internal!("invalid peers list length"))?, + ); + for (i, peer) in peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs new file mode 100644 index 00000000..b41e31d2 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -0,0 +1,30 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationValueChanged { + key: ValueKey, + value: ValueData, +} + +impl RPCOperationValueChanged { + pub fn decode( + reader: &veilid_capnp::operation_value_changed::Reader, + ) -> Result { + let k_reader = reader.get_key().map_err(map_error_capnp_error!())?; + let key = decode_value_key(&k_reader)?; + let v_reader = reader.get_value().map_err(map_error_capnp_error!())?; + let value = decode_value_data(&v_reader)?; + Ok(RPCOperationValueChanged { key, value }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_value_changed::Builder, + ) -> Result<(), RPCError> { + let k_builder = builder.init_key(); + encode_value_key(&self.key, &mut k_builder)?; + let v_builder = builder.init_value(); + encode_value_data(&self.value, &mut v_builder)?; + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs new file mode 100644 index 00000000..372e7dac --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -0,0 +1,71 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationWatchValueQ { + key: ValueKey, +} + +impl RPCOperationWatchValueQ { + pub fn decode( + reader: &veilid_capnp::operation_watch_value_q::Reader, + ) -> Result { + let k_reader = reader.get_key().map_err(map_error_capnp_error!())?; + let key = decode_value_key(&k_reader)?; + Ok(RPCOperationWatchValueQ { key }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_watch_value_q::Builder, + ) -> Result<(), RPCError> { + let k_builder = builder.init_key(); + encode_value_key(&self.key, &mut k_builder)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperationWatchValueA { + expiration: u64, + peers: Vec, +} + +impl RPCOperationWatchValueA { + pub fn decode( + reader: &veilid_capnp::operation_watch_value_a::Reader, + ) -> Result { + let expiration = reader.get_expiration(); + let peers_reader = reader.get_peers().map_err(map_error_capnp_error!())?; + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(map_error_internal!("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p, true)?; + peers.push(peer_info); + } + + Ok(RPCOperationWatchValueA { expiration, peers }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_watch_value_a::Builder, + ) -> Result<(), RPCError> { + builder.set_expiration(self.expiration); + + let mut peers_builder = builder.init_peers( + self.peers + .len() + .try_into() + .map_err(map_error_internal!("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + + Ok(()) + } +}