set value

This commit is contained in:
John Smith 2023-05-10 10:44:48 -04:00
parent a3e2dbc744
commit 734606b6ad
28 changed files with 716 additions and 262 deletions

View File

@ -359,7 +359,7 @@ struct OperationSetValueQ @0xbac06191ff8bdbc5 {
}
struct OperationSetValueA @0x9378d0732dc95be2 {
set @0 :Bool; # true if the value was accepted
set @0 :Bool; # true if the set was close enough to be set
value @1 :SignedValueData; # optional: the current value at the key if the set seq number was lower or equal to what was there before
peers @2 :List(PeerInfo); # returned 'closer peer' information on either success or failure
}

View File

@ -152,7 +152,7 @@ impl ProtectedStore {
pub async fn save_user_secret_rkyv<K, T>(&self, key: K, value: &T) -> EyreResult<bool>
where
K: AsRef<str> + fmt::Debug,
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let v = to_rkyv(value)?;
self.save_user_secret(key, &v).await
@ -175,8 +175,7 @@ impl ProtectedStore {
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived:
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{
let out = self.load_user_secret(key).await?;
let b = match out {

View File

@ -92,7 +92,7 @@ impl TableDB {
/// Store a key in rkyv format with a value in a column in the TableDB. Performs a single transaction immediately.
pub async fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
where
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let v = to_rkyv(value)?;
@ -127,8 +127,7 @@ impl TableDB {
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived:
RkyvDeserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{
let db = self.inner.lock().database.clone();
let out = db.get(col, key).wrap_err("failed to get key")?;
@ -240,7 +239,7 @@ impl TableDBTransaction {
/// Store a key in rkyv format with a value in a column in the TableDB
pub fn store_rkyv<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
where
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let v = to_rkyv(value)?;
let mut inner = self.inner.lock();

View File

@ -45,14 +45,6 @@ use rkyv::{
bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip,
Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
type RkyvSerializer = rkyv::ser::serializers::CompositeSerializer<
rkyv::ser::serializers::AlignedSerializer<rkyv::AlignedVec>,
rkyv::ser::serializers::FallbackScratch<
rkyv::ser::serializers::HeapScratch<1024>,
rkyv::ser::serializers::AllocScratch,
>,
rkyv::ser::serializers::SharedSerializeMap,
>;
type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>;
use serde::*;

View File

@ -19,9 +19,9 @@ impl RPCOperationAppCallQ {
Ok(())
}
pub fn message(&self) -> &[u8] {
&self.message
}
// pub fn message(&self) -> &[u8] {
// &self.message
// }
pub fn destructure(self) -> Vec<u8> {
self.message
@ -62,9 +62,9 @@ impl RPCOperationAppCallA {
Ok(())
}
pub fn message(&self) -> &[u8] {
&self.message
}
// pub fn message(&self) -> &[u8] {
// &self.message
// }
pub fn destructure(self) -> Vec<u8> {
self.message
@ -86,5 +86,4 @@ impl RPCOperationAppCallA {
builder.set_message(&self.message);
Ok(())
}
}

View File

@ -19,9 +19,9 @@ impl RPCOperationAppMessage {
Ok(())
}
pub fn message(&self) -> &[u8] {
&self.message
}
// pub fn message(&self) -> &[u8] {
// &self.message
// }
pub fn destructure(self) -> Vec<u8> {
self.message
}

View File

@ -15,9 +15,9 @@ impl RPCOperationFindNodeQ {
Ok(())
}
pub fn node_id(&self) -> &TypedKey {
&self.node_id
}
// pub fn node_id(&self) -> &TypedKey {
// &self.node_id
// }
pub fn destructure(self) -> TypedKey {
self.node_id
@ -57,9 +57,9 @@ impl RPCOperationFindNodeA {
Ok(())
}
pub fn peers(&self) -> &[PeerInfo] {
&self.peers
}
// pub fn peers(&self) -> &[PeerInfo] {
// &self.peers
// }
pub fn destructure(self) -> Vec<PeerInfo> {
self.peers

View File

@ -39,15 +39,15 @@ impl RPCOperationGetValueQ {
Ok(())
}
pub fn key(&self) -> &TypedKey {
&self.key
}
pub fn subkey(&self) -> ValueSubkey {
self.subkey
}
pub fn want_descriptor(&self) -> bool {
self.want_descriptor
}
// pub fn key(&self) -> &TypedKey {
// &self.key
// }
// pub fn subkey(&self) -> ValueSubkey {
// self.subkey
// }
// pub fn want_descriptor(&self) -> bool {
// self.want_descriptor
// }
pub fn destructure(self) -> (TypedKey, ValueSubkey, bool) {
(self.key, self.subkey, self.want_descriptor)
}
@ -155,15 +155,15 @@ impl RPCOperationGetValueA {
Ok(())
}
pub fn value(&self) -> Option<&SignedValueData> {
self.value.as_ref()
}
pub fn peers(&self) -> &[PeerInfo] {
&self.peers
}
pub fn descriptor(&self) -> Option<&SignedValueDescriptor> {
self.descriptor.as_ref()
}
// pub fn value(&self) -> Option<&SignedValueData> {
// self.value.as_ref()
// }
// pub fn peers(&self) -> &[PeerInfo] {
// &self.peers
// }
// pub fn descriptor(&self) -> Option<&SignedValueDescriptor> {
// self.descriptor.as_ref()
// }
pub fn destructure(
self,
) -> (

View File

@ -20,9 +20,9 @@ impl RPCOperationReturnReceipt {
Ok(())
}
pub fn receipt(&self) -> &[u8] {
&self.receipt
}
// pub fn receipt(&self) -> &[u8] {
// &self.receipt
// }
pub fn destructure(self) -> Vec<u8> {
self.receipt

View File

@ -5,14 +5,15 @@ const MAX_SET_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]
pub struct ValidateSetValueContext {
last_descriptor: Option<SignedValueDescriptor>,
subkey: ValueSubkey,
vcrypto: CryptoSystemVersion,
pub descriptor: SignedValueDescriptor,
pub subkey: ValueSubkey,
pub vcrypto: CryptoSystemVersion,
}
impl fmt::Debug for ValidateSetValueContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ValidateSetValueContext")
.field("last_descriptor", &self.last_descriptor)
.field("descriptor", &self.descriptor)
.field("subkey", &self.subkey)
.field("vcrypto", &self.vcrypto.kind().to_string())
.finish()
@ -45,21 +46,21 @@ impl RPCOperationSetValueQ {
Ok(())
}
pub fn key(&self) -> &TypedKey {
&self.key
}
// pub fn key(&self) -> &TypedKey {
// &self.key
// }
pub fn subkey(&self) -> ValueSubkey {
self.subkey
}
// pub fn subkey(&self) -> ValueSubkey {
// self.subkey
// }
pub fn value(&self) -> &SignedValueData {
&self.value
}
// pub fn value(&self) -> &SignedValueData {
// &self.value
// }
pub fn descriptor(&self) -> Option<&SignedValueDescriptor> {
self.descriptor.as_ref()
}
// pub fn descriptor(&self) -> Option<&SignedValueDescriptor> {
// self.descriptor.as_ref()
// }
pub fn destructure(
self,
) -> (
@ -137,22 +138,16 @@ impl RPCOperationSetValueA {
};
if let Some(value) = &self.value {
// Get descriptor to validate with
let Some(descriptor) = &set_value_context.last_descriptor else {
return Err(RPCError::protocol(
"no last descriptor, requires a descriptor",
));
};
// Ensure the descriptor itself validates
descriptor
set_value_context
.descriptor
.validate(set_value_context.vcrypto.clone())
.map_err(RPCError::protocol)?;
// And the signed value data
value
.validate(
descriptor.owner(),
set_value_context.descriptor.owner(),
set_value_context.subkey,
set_value_context.vcrypto.clone(),
)
@ -163,15 +158,15 @@ impl RPCOperationSetValueA {
Ok(())
}
pub fn set(&self) -> bool {
self.set
}
pub fn value(&self) -> Option<&SignedValueData> {
self.value.as_ref()
}
pub fn peers(&self) -> &[PeerInfo] {
&self.peers
}
// pub fn set(&self) -> bool {
// self.set
// }
// pub fn value(&self) -> Option<&SignedValueData> {
// self.value.as_ref()
// }
// pub fn peers(&self) -> &[PeerInfo] {
// &self.peers
// }
pub fn destructure(self) -> (bool, Option<SignedValueData>, Vec<PeerInfo>) {
(self.set, self.value, self.peers)
}

View File

@ -12,9 +12,9 @@ impl RPCOperationSignal {
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
self.signal_info.validate(validate_context.crypto.clone())
}
pub fn signal_info(&self) -> &SignalInfo {
&self.signal_info
}
// pub fn signal_info(&self) -> &SignalInfo {
// &self.signal_info
// }
pub fn destructure(self) -> SignalInfo {
self.signal_info
}

View File

@ -13,9 +13,9 @@ impl RPCOperationStatusQ {
Ok(())
}
pub fn node_status(&self) -> Option<&NodeStatus> {
self.node_status.as_ref()
}
// pub fn node_status(&self) -> Option<&NodeStatus> {
// self.node_status.as_ref()
// }
pub fn destructure(self) -> Option<NodeStatus> {
self.node_status
}
@ -60,12 +60,12 @@ impl RPCOperationStatusA {
Ok(())
}
pub fn node_status(&self) -> Option<&NodeStatus> {
self.node_status.as_ref()
}
pub fn sender_info(&self) -> Option<&SenderInfo> {
self.sender_info.as_ref()
}
// pub fn node_status(&self) -> Option<&NodeStatus> {
// self.node_status.as_ref()
// }
// pub fn sender_info(&self) -> Option<&SenderInfo> {
// self.sender_info.as_ref()
// }
pub fn destructure(self) -> (Option<NodeStatus>, Option<SenderInfo>) {
(self.node_status, self.sender_info)
}

View File

@ -30,15 +30,15 @@ impl RPCOperationValidateDialInfo {
pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> {
Ok(())
}
pub fn dial_info(&self) -> &DialInfo {
&self.dial_info
}
pub fn receipt(&self) -> &[u8] {
&self.receipt
}
pub fn redirect(&self) -> bool {
self.redirect
}
// pub fn dial_info(&self) -> &DialInfo {
// &self.dial_info
// }
// pub fn receipt(&self) -> &[u8] {
// &self.receipt
// }
// pub fn redirect(&self) -> bool {
// self.redirect
// }
pub fn destructure(self) -> (DialInfo, Vec<u8>, bool) {
(self.dial_info, self.receipt, self.redirect)
}

View File

@ -52,7 +52,9 @@ impl RPCProcessor {
};
// Verify peers are in the correct peer scope
for peer_info in find_node_a.peers() {
let peers = find_node_a.destructure();
for peer_info in &peers {
if !self.filter_node_info(RoutingDomain::PublicInternet, peer_info.signed_node_info()) {
return Err(RPCError::invalid_format(
"find_node response has invalid peer scope",
@ -60,7 +62,6 @@ impl RPCProcessor {
}
}
let peers = find_node_a.destructure();
Ok(NetworkResult::value(Answer::new(latency, peers)))
}

View File

@ -45,7 +45,7 @@ impl RPCProcessor {
return Err(RPCError::internal("unsupported cryptosystem"));
};
// Send the app call question
// Send the getvalue question
let question_context = QuestionContext::GetValue(ValidateGetValueContext {
last_descriptor,
subkey,
@ -119,10 +119,10 @@ impl RPCProcessor {
// See if we have this record ourselves
let storage_manager = self.storage_manager();
let subkey_result = storage_manager
.handle_get_value(key, subkey, want_descriptor)
let subkey_result = network_result_try!(storage_manager
.inbound_get_value(key, subkey, want_descriptor)
.await
.map_err(RPCError::internal)?;
.map_err(RPCError::internal)?);
// Make GetValue answer
let get_value_a = RPCOperationGetValueA::new(

View File

@ -1,12 +1,154 @@
use super::*;
#[derive(Clone, Debug)]
pub struct SetValueAnswer {
pub set: bool,
pub value: Option<SignedValueData>,
pub peers: Vec<PeerInfo>,
}
impl RPCProcessor {
/// Sends a set value request and wait for response
/// Can be sent via all methods including relays
/// Safety routes may be used, but never private routes.
/// Because this leaks information about the identity of the node itself,
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn rpc_call_set_value(
self,
dest: Destination,
key: TypedKey,
subkey: ValueSubkey,
value: SignedValueData,
descriptor: SignedValueDescriptor,
send_descriptor: bool,
) -> Result<NetworkResult<Answer<SetValueAnswer>>, RPCError> {
// Ensure destination never has a private route
if matches!(
dest,
Destination::PrivateRoute {
private_route: _,
safety_selection: _
}
) {
return Err(RPCError::internal(
"Never send set value requests over private routes",
));
}
let set_value_q = RPCOperationSetValueQ::new(
key,
subkey,
value,
if send_descriptor {
Some(descriptor.clone())
} else {
None
},
);
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
RPCQuestionDetail::SetValueQ(set_value_q),
);
let Some(vcrypto) = self.crypto.get(key.kind) else {
return Err(RPCError::internal("unsupported cryptosystem"));
};
// Send the setvalue question
let question_context = QuestionContext::SetValue(ValidateSetValueContext {
descriptor,
subkey,
vcrypto,
});
let waitable_reply = network_result_try!(
self.question(dest, question, Some(question_context))
.await?
);
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v,
};
// Get the right answer type
let (_, _, _, kind) = msg.operation.destructure();
let set_value_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::SetValueA(a) => a,
_ => return Err(RPCError::invalid_format("not a setvalue answer")),
},
_ => return Err(RPCError::invalid_format("not an answer")),
};
let (set, value, peers) = set_value_a.destructure();
Ok(NetworkResult::value(Answer::new(
latency,
SetValueAnswer { set, value, peers },
)))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_set_value_q(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// tracing::Span::current().record("res", &tracing::field::display(res));
Err(RPCError::unimplemented("process_set_value_q"))
// Ensure this never came over a private route, safety route is okay though
match &msg.header.detail {
RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {}
RPCMessageHeaderDetail::PrivateRouted(_) => {
return Ok(NetworkResult::invalid_message(
"not processing set value request over private route",
))
}
}
// Get the question
let kind = msg.operation.kind().clone();
let set_value_q = match kind {
RPCOperationKind::Question(q) => match q.destructure() {
(_, RPCQuestionDetail::SetValueQ(q)) => q,
_ => panic!("not a setvalue question"),
},
_ => panic!("not a question"),
};
// Destructure
let (key, subkey, value, descriptor) = set_value_q.destructure();
// Get the nodes that we know about that are closer to the the key than our own node
let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key));
// If there are less than 'set_value_count' peers that are closer, then store here too
let set_value_count = {
let c = self.config.get();
c.network.dht.set_value_fanout as usize
};
let (set, new_value) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough
(false, None)
} else {
// Close enough, lets set it
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
let new_value = network_result_try!(storage_manager
.inbound_set_value(key, subkey, value, descriptor)
.await
.map_err(RPCError::internal)?);
(true, new_value)
};
// Make SetValue answer
let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?;
// Send SetValue answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::SetValueA(set_value_a)))
.await
}
}

View File

@ -96,20 +96,21 @@ impl RPCProcessor {
},
_ => return Err(RPCError::invalid_format("not an answer")),
};
let (a_node_status, sender_info) = status_a.destructure();
// Ensure the returned node status is the kind for the routing domain we asked for
if let Some(target_nr) = opt_target_nr {
if let Some(node_status) = status_a.node_status() {
if let Some(a_node_status) = a_node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
if !matches!(a_node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(node_status, NodeStatus::LocalNetwork(_)) {
if !matches!(a_node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
@ -118,7 +119,7 @@ impl RPCProcessor {
}
// Update latest node status in routing table
target_nr.update_node_status(node_status.clone());
target_nr.update_node_status(a_node_status.clone());
}
}
@ -132,7 +133,7 @@ impl RPCProcessor {
safety_selection,
} => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = status_a.sender_info() {
if let Some(sender_info) = sender_info {
match send_data_kind {
SendDataKind::Direct(connection_descriptor) => {
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
@ -186,13 +187,15 @@ impl RPCProcessor {
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
// Get the question
let status_q = match msg.operation.kind() {
RPCOperationKind::Question(q) => match q.detail() {
RPCQuestionDetail::StatusQ(q) => q,
let kind = msg.operation.kind().clone();
let status_q = match kind {
RPCOperationKind::Question(q) => match q.destructure() {
(_, RPCQuestionDetail::StatusQ(q)) => q,
_ => panic!("not a status question"),
},
_ => panic!("not a question"),
};
let q_node_status = status_q.destructure();
let (node_status, sender_info) = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => {
@ -200,17 +203,17 @@ impl RPCProcessor {
let routing_domain = detail.routing_domain;
// Ensure the node status from the question is the kind for the routing domain we received the request in
if let Some(node_status) = status_q.node_status() {
if let Some(q_node_status) = q_node_status {
match routing_domain {
RoutingDomain::PublicInternet => {
if !matches!(node_status, NodeStatus::PublicInternet(_)) {
if !matches!(q_node_status, NodeStatus::PublicInternet(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match PublicInternet routing domain",
));
}
}
RoutingDomain::LocalNetwork => {
if !matches!(node_status, NodeStatus::LocalNetwork(_)) {
if !matches!(q_node_status, NodeStatus::LocalNetwork(_)) {
return Ok(NetworkResult::invalid_message(
"node status doesn't match LocalNetwork routing domain",
));
@ -221,7 +224,7 @@ impl RPCProcessor {
// update node status for the requesting node to our routing table
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
// Update latest node status in routing table for the statusq sender
sender_nr.update_node_status(node_status.clone());
sender_nr.update_node_status(q_node_status.clone());
}
}

View File

@ -15,7 +15,7 @@ struct DoGetValueContext {
impl StorageManager {
/// Perform a 'get value' query on the network
pub async fn do_get_value(
pub async fn outbound_get_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
@ -116,8 +116,9 @@ impl StorageManager {
// Increase the consensus count for the existing value
ctx.value_count += 1;
} else if new_seq > prior_seq {
// If the sequence number is greater, go with it
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has show us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
@ -174,8 +175,17 @@ impl StorageManager {
}
/// Handle a recieved 'Get Value' query
pub async fn handle_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result<SubkeyResult, VeilidAPIError> {
pub async fn inbound_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> Result<NetworkResult<SubkeyResult>, VeilidAPIError> {
let mut inner = self.lock().await?;
inner.handle_get_remote_value(key, subkey, want_descriptor)
let res = match inner.handle_get_remote_value(key, subkey, want_descriptor) {
Ok(res) => res,
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
},
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
},
};
Ok(NetworkResult::value(res))
}
}

View File

@ -1,7 +1,8 @@
mod do_get_value;
mod get_value;
mod keys;
mod record_store;
mod record_store_limits;
mod set_value;
mod storage_manager_inner;
mod tasks;
mod types;
@ -88,8 +89,8 @@ impl StorageManager {
#[instrument(level = "debug", skip_all, err)]
pub async fn init(&self) -> EyreResult<()> {
debug!("startup storage manager");
let mut inner = self.inner.lock().await;
let mut inner = self.inner.lock().await;
inner.init(self.clone()).await?;
Ok(())
@ -175,7 +176,7 @@ impl StorageManager {
// Use the safety selection we opened the record with
let subkey: ValueSubkey = 0;
let subkey_result = self
.do_get_value(
.outbound_get_value(
rpc_processor,
key,
subkey,
@ -224,6 +225,8 @@ impl StorageManager {
/// Get the value of a subkey from an opened local record
/// may refresh the record, and will if it is forced to or the subkey is not available locally yet
/// Returns Ok(None) if no value was found
/// Returns Ok(Some(value)) is a value was found online or locally
pub async fn get_value(
&self,
key: TypedKey,
@ -263,7 +266,7 @@ impl StorageManager {
.as_ref()
.map(|v| v.value_data().seq());
let subkey_result = self
.do_get_value(
.outbound_get_value(
rpc_processor,
key,
subkey,
@ -290,6 +293,8 @@ impl StorageManager {
/// Set the value of a subkey on an opened local record
/// Puts changes to the network immediately and may refresh the record if the there is a newer subkey available online
/// Returns Ok(None) if the value was set
/// Returns Ok(Some(newer value)) if a newer value was found online
pub async fn set_value(
&self,
key: TypedKey,
@ -328,6 +333,7 @@ impl StorageManager {
} else {
ValueData::new(data, writer.key)
};
let seq = value_data.seq();
// Validate with schema
if !schema.check_subkey_value_data(descriptor.owner(), subkey, &value_data) {
@ -343,110 +349,43 @@ impl StorageManager {
vcrypto,
writer.secret,
)?;
let subkey_result = SubkeyResult {
value: Some(signed_value_data),
descriptor: Some(descriptor)
};
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, just write it locally and return immediately
inner
.handle_set_local_value(key, subkey, signed_value_data)
.handle_set_local_value(key, subkey, signed_value_data.clone())
.await?;
// Add to offline writes to flush
inner.offline_subkey_writes.entry(key).and_modify(|x| { x.insert(subkey); } ).or_insert(ValueSubkeyRangeSet::single(subkey));
return Ok(Some(signed_value_data.into_value_data()))
};
// Drop the lock for network access
drop(inner);
// Use the safety selection we opened the record with
let final_subkey_result = self
.do_set_value(
let final_signed_value_data = self
.outbound_set_value(
rpc_processor,
key,
subkey,
opened_record.safety_selection(),
subkey_result,
signed_value_data,
descriptor,
)
.await?;
// See if we got a value back
let Some(subkey_result_value) = subkey_result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
if final_signed_value_data.value_data().seq() != seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.handle_set_local_value(key, subkey, final_signed_value_data.clone())
.await?;
}
Ok(Some(subkey_result_value.into_value_data()))
// Store subkey locally
inner
.handle_set_local_value(key, subkey, signed_value_data)
.await?;
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.into_value_data()));
}
}
// Refresh if we can
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Offline, try again later
apibail_try_again!();
};
// Drop the lock for network access
drop(inner);
// May have last descriptor / value
// Use the safety selection we opened the record with
let opt_last_seq = last_subkey_result
.value
.as_ref()
.map(|v| v.value_data().seq());
let subkey_result = self
.do_get_value(
rpc_processor,
key,
subkey,
opened_record.safety_selection(),
last_subkey_result,
)
.await?;
// See if we got a value back
let Some(subkey_result_value) = subkey_result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.await?;
}
Ok(Some(subkey_result_value.into_value_data()))
Ok(Some(final_signed_value_data.into_value_data()))
}
pub async fn watch_values(

View File

@ -9,9 +9,9 @@ use hashlink::LruCache;
pub struct RecordStore<D>
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
table_store: TableStore,
name: String,
@ -41,9 +41,9 @@ pub struct SubkeyResult {
impl<D> RecordStore<D>
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
let subkey_cache_size = limits.subkey_cache_size as usize;
@ -421,7 +421,11 @@ where
) -> Result<(), VeilidAPIError> {
// Check size limit for data
if signed_value_data.value_data().data().len() > self.limits.max_subkey_size {
return Err(VeilidAPIError::generic("record subkey too large"));
apibail_invalid_argument!(
"record subkey too large",
"signed_value_data.value_data.data.len",
signed_value_data.value_data().data().len()
);
}
// Get record from index

View File

@ -0,0 +1,225 @@
use super::*;
/// The context of the do_get_value operation
struct DoSetValueContext {
/// The latest value of the subkey, may be the value passed in
pub value: SignedValueData,
/// The consensus count for the value we have received
pub value_count: usize,
/// The parsed schema from the descriptor if we have one
pub schema: DHTSchema,
}
impl StorageManager {
/// Perform a 'set value' query on the network
pub async fn outbound_set_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
value: SignedValueData,
descriptor: SignedValueDescriptor,
) -> Result<SignedValueData, VeilidAPIError> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'SetValue'
let (key_count, consensus_count, fanout, timeout_us) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
c.network.dht.set_value_count as usize,
c.network.dht.set_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
)
};
// Make do-set-value answer context
let schema = descriptor.schema()?;
let context = Arc::new(Mutex::new(DoSetValueContext {
value,
value_count: 0,
schema,
}));
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
let context = context.clone();
let descriptor = descriptor.clone();
async move {
let send_descriptor = true; // xxx check if next_node needs the descriptor or not
// get most recent value to send
let value = {
let ctx = context.lock();
ctx.value.clone()
};
// send across the wire
let vres = rpc_processor
.clone()
.rpc_call_set_value(
Destination::direct(next_node).with_safety(safety_selection),
key,
subkey,
value,
descriptor.clone(),
send_descriptor,
)
.await?;
let sva = network_result_value_or_log!(vres => {
// Any other failures, just try the next node
return Ok(None);
});
// If the node was close enough to possibly set the value
if sva.answer.set {
let mut ctx = context.lock();
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = sva.answer.value {
// Validate with schema
if !ctx.schema.check_subkey_value_data(
descriptor.owner(),
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(None);
}
// We have a prior value, ensure this is a newer sequence number
let prior_seq = ctx.value.value_data().seq();
let new_seq = value.value_data().seq();
if new_seq > prior_seq {
// If the sequence number is greater, keep it
ctx.value = value;
// One node has show us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, or an equal sequence number,
// node should have not returned a value here.
// Skip this node's closer list because it is misbehaving
return Ok(None);
}
}
else
{
// It was set on this node and no newer value was found and returned,
// so increase our consensus count
ctx.value_count += 1;
}
}
// Return peers if we have some
Ok(Some(sva.answer.peers))
}
};
// Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
let ctx = context.lock();
if ctx.value_count >= consensus_count {
return Some(());
}
None
};
// Call the fanout
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
key_count,
fanout,
timeout_us,
call_routine,
check_done,
);
match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout |
// If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) |
// If we finished without consensus (ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
Ok(ctx.value.clone())
}
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
Err(e.into())
}
}
}
/// Handle a recieved 'Set Value' query
/// Returns a None if the value passed in was set
/// Returns a Some(current value) if the value was older and the current value was kept
pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option<SignedValueDescriptor>) -> Result<NetworkResult<Option<SignedValueData>>, VeilidAPIError> {
let mut inner = self.lock().await?;
// See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?;
// Make sure this value would actually be newer
if let Some(last_value) = &last_subkey_result.value {
if value.value_data().seq() < last_value.value_data().seq() {
// inbound value is older than the one we have, just return the one we have
return Ok(NetworkResult::value(Some(last_value.clone())));
}
}
// Get the descriptor and schema for the key
let actual_descriptor = match last_subkey_result.descriptor {
Some(last_descriptor) => {
if let Some(descriptor) = descriptor {
// Descriptor must match last one if it is provided
if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal {
return Ok(NetworkResult::invalid_message("setvalue descriptor does not match last descriptor"));
}
} else {
// Descriptor was not provided always go with last descriptor
}
last_descriptor
}
None => {
if let Some(descriptor) = descriptor {
descriptor
} else {
// No descriptor
return Ok(NetworkResult::invalid_message("descriptor must be provided"));
}
}
};
let Ok(schema) = actual_descriptor.schema() else {
return Ok(NetworkResult::invalid_message("invalid schema"));
};
// Validate new value with schema
if !schema.check_subkey_value_data(actual_descriptor.owner(), subkey, value.value_data()) {
// Validation failed, ignore this value
return Ok(NetworkResult::invalid_message("failed schema validation"));
}
// Do the set and return no new value
match inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await {
Ok(()) => {},
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);
},
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
},
}
Ok(NetworkResult::value(None))
}
}

View File

@ -120,10 +120,14 @@ impl StorageManagerInner {
// Final flush on record stores
if let Some(mut local_record_store) = self.local_record_store.take() {
local_record_store.tick().await;
if let Err(e) = local_record_store.tick().await {
log_stor!(error "termination local record store tick failed: {}", e);
}
}
if let Some(mut remote_record_store) = self.remote_record_store.take() {
remote_record_store.tick().await;
if let Err(e) = remote_record_store.tick().await {
log_stor!(error "termination remote record store tick failed: {}", e);
}
}
// Save metadata
@ -142,7 +146,7 @@ impl StorageManagerInner {
async fn save_metadata(&mut self) -> EyreResult<()>{
if let Some(metadata_db) = &self.metadata_db {
let tx = metadata_db.transact();
tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes);
tx.store_rkyv(0, b"offline_subkey_writes", &self.offline_subkey_writes)?;
tx.commit().await.wrap_err("failed to commit")?
}
Ok(())
@ -155,8 +159,6 @@ impl StorageManagerInner {
Ok(())
}
write offline subkey write flush background task or make a ticket for it and get back to it after the rest of set value
pub async fn create_new_owned_local_record(
&mut self,
kind: CryptoKind,
@ -386,12 +388,23 @@ impl StorageManagerInner {
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
signed_value_descriptor: SignedValueDescriptor,
) -> Result<(), VeilidAPIError> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
// See if we have a remote record already or not
if remote_record_store.with_record(key, |_|{}).is_none() {
// record didn't exist, make it
let cur_ts = get_aligned_timestamp();
let remote_record_detail = RemoteRecordDetail { };
let record =
Record::<RemoteRecordDetail>::new(cur_ts, signed_value_descriptor, remote_record_detail)?;
remote_record_store.new_record(key, record).await?
};
// Write subkey to remote store
remote_record_store
.set_subkey(key, subkey, signed_value_data)
@ -403,9 +416,9 @@ impl StorageManagerInner {
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
let compiled = record.descriptor().schema_data();
let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len());

View File

@ -7,6 +7,6 @@ use super::*;
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct LocalRecordDetail {
/// The last 'safety selection' used when creating/opening this record.
/// Even when closed, this safety selection applies to republication attempts by the system.
/// Even when closed, this safety selection applies to re-publication attempts by the system.
pub safety_selection: SafetySelection,
}

View File

@ -6,9 +6,9 @@ use super::*;
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct Record<D>
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
descriptor: SignedValueDescriptor,
subkey_count: usize,
@ -19,9 +19,9 @@ where
impl<D> Record<D>
where
D: Clone + RkyvArchive + RkyvSerialize<RkyvSerializer>,
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
pub fn new(
cur_ts: Timestamp,

View File

@ -1,8 +1,9 @@
mod rkyv_enum_set;
mod rkyv_range_set_blaze;
pub mod serialize_arc;
pub mod serialize_range_set_blaze;
mod serialize_json;
pub mod serialize_range_set_blaze;
mod veilid_rkyv;
use super::*;
use core::fmt::Debug;
@ -10,28 +11,4 @@ use core::fmt::Debug;
pub use rkyv_enum_set::*;
pub use rkyv_range_set_blaze::*;
pub use serialize_json::*;
pub fn to_rkyv<T>(v: &T) -> EyreResult<Vec<u8>>
where
T: RkyvSerialize<rkyv::ser::serializers::AllocSerializer<1024>>,
{
Ok(rkyv::to_bytes::<T, 1024>(v)
.wrap_err("failed to freeze object")?
.to_vec())
}
pub fn from_rkyv<T>(v: Vec<u8>) -> EyreResult<T>
where
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived:
rkyv::Deserialize<T, rkyv::de::deserializers::SharedDeserializeMap>,
{
match rkyv::from_bytes::<T>(&v) {
Ok(v) => Ok(v),
Err(e) => {
bail!("failed to deserialize frozen object: {}", e);
}
}
}
pub use veilid_rkyv::*;

View File

@ -52,16 +52,16 @@ where
D: rkyv::Fallible + ?Sized,
T: rkyv::Archive + Integer,
rkyv::Archived<T>: rkyv::Deserialize<T, D>,
// D::Error: From<String>, // xxx this doesn't work
D::Error: From<String>,
{
fn deserialize_with(
field: &rkyv::Archived<Vec<T>>,
deserializer: &mut D,
) -> Result<RangeSetBlaze<T>, D::Error> {
let mut out = RangeSetBlaze::<T>::new();
// if field.len() % 2 == 1 {
// return Err("invalid range set length".to_owned().into());
// }
if field.len() % 2 == 1 {
return Err("invalid range set length".to_owned().into());
}
let f = field.as_slice();
for i in 0..field.len() / 2 {
let l: T = f[i * 2].deserialize(deserializer)?;

View File

@ -0,0 +1,151 @@
use super::*;
use rkyv::ser::Serializer;
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct VeilidRkyvSerializer<S> {
inner: S,
}
impl<S> VeilidRkyvSerializer<S> {
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S: rkyv::Fallible> rkyv::Fallible for VeilidRkyvSerializer<S> {
type Error = VeilidRkyvError<S::Error>;
}
impl<S: rkyv::ser::ScratchSpace> rkyv::ser::ScratchSpace for VeilidRkyvSerializer<S> {
unsafe fn push_scratch(
&mut self,
layout: core::alloc::Layout,
) -> Result<core::ptr::NonNull<[u8]>, Self::Error> {
self.inner
.push_scratch(layout)
.map_err(VeilidRkyvError::Inner)
}
unsafe fn pop_scratch(
&mut self,
ptr: core::ptr::NonNull<u8>,
layout: core::alloc::Layout,
) -> Result<(), Self::Error> {
self.inner
.pop_scratch(ptr, layout)
.map_err(VeilidRkyvError::Inner)
}
}
impl<S: rkyv::ser::Serializer> rkyv::ser::Serializer for VeilidRkyvSerializer<S> {
#[inline]
fn pos(&self) -> usize {
self.inner.pos()
}
#[inline]
fn write(&mut self, bytes: &[u8]) -> Result<(), Self::Error> {
self.inner.write(bytes).map_err(VeilidRkyvError::Inner)
}
}
impl<S: Default> Default for VeilidRkyvSerializer<S> {
fn default() -> Self {
Self {
inner: S::default(),
}
}
}
pub type DefaultVeilidRkyvSerializer =
VeilidRkyvSerializer<rkyv::ser::serializers::AllocSerializer<1024>>;
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Default)]
pub struct VeilidSharedDeserializeMap {
inner: SharedDeserializeMap,
}
impl VeilidSharedDeserializeMap {
#[inline]
pub fn new() -> Self {
Self {
inner: SharedDeserializeMap::new(),
}
}
}
impl rkyv::Fallible for VeilidSharedDeserializeMap {
type Error = VeilidRkyvError<rkyv::de::deserializers::SharedDeserializeMapError>;
}
impl rkyv::de::SharedDeserializeRegistry for VeilidSharedDeserializeMap {
fn get_shared_ptr(&mut self, ptr: *const u8) -> Option<&dyn rkyv::de::SharedPointer> {
self.inner.get_shared_ptr(ptr)
}
fn add_shared_ptr(
&mut self,
ptr: *const u8,
shared: Box<dyn rkyv::de::SharedPointer>,
) -> Result<(), Self::Error> {
self.inner
.add_shared_ptr(ptr, shared)
.map_err(VeilidRkyvError::Inner)
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub enum VeilidRkyvError<E> {
Inner(E),
StringError(String),
}
impl<E: Debug> From<String> for VeilidRkyvError<E> {
fn from(s: String) -> Self {
Self::StringError(s)
}
}
impl<E: Debug + fmt::Display> fmt::Display for VeilidRkyvError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VeilidRkyvError::Inner(e) => write!(f, "Inner: {}", e),
VeilidRkyvError::StringError(s) => write!(f, "StringError: {}", s),
}
}
}
impl<E: Debug + fmt::Display> std::error::Error for VeilidRkyvError<E> {}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn to_rkyv<T>(value: &T) -> EyreResult<Vec<u8>>
where
T: RkyvSerialize<DefaultVeilidRkyvSerializer>,
{
let mut serializer = DefaultVeilidRkyvSerializer::default();
serializer
.serialize_value(value)
.wrap_err("failed to serialize object")?;
Ok(serializer
.into_inner()
.into_serializer()
.into_inner()
.to_vec())
}
pub fn from_rkyv<T>(bytes: Vec<u8>) -> EyreResult<T>
where
T: RkyvArchive,
<T as RkyvArchive>::Archived:
for<'t> CheckBytes<rkyv::validation::validators::DefaultValidator<'t>>,
<T as RkyvArchive>::Archived: RkyvDeserialize<T, VeilidSharedDeserializeMap>,
{
rkyv::check_archived_root::<T>(&bytes)
.map_err(|e| eyre!("checkbytes failed: {}", e))?
.deserialize(&mut VeilidSharedDeserializeMap::default())
.map_err(|e| eyre!("failed to deserialize: {}", e))
}

View File

@ -29,6 +29,11 @@ impl ValueSubkeyRangeSet {
data: Default::default(),
}
}
pub fn single(value: ValueSubkey) -> Self {
let mut data = RangeSetBlaze::new();
data.insert(value);
Self { data }
}
}
impl Deref for ValueSubkeyRangeSet {