checkpoint

This commit is contained in:
Christien Rioux 2023-11-19 21:14:58 -05:00
parent 3f86801ecd
commit 70e256a25a
21 changed files with 765 additions and 148 deletions

View File

@ -88,6 +88,9 @@ core:
remote_max_records: 65536
remote_max_subkey_cache_memory_mb: %REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%
remote_max_storage_space_mb: 0
public_watch_limit: 32
member_watch_limit: 8
upnp: true
detect_address_changes: true
restricted_nat_retries: 0

View File

@ -255,6 +255,9 @@ dht:
remote_max_records: 65536
remote_max_subkey_cache_memory_mb: %REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%
remote_max_storage_space_mb: 0
public_watch_limit: 32
member_watch_limit: 8
```
#### core:network:tls

View File

@ -356,8 +356,8 @@ struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
watcher @4 :PublicKey; # the watcher performing the watch, can be the owner or a schema member
signature @5 :Signature; # signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count
watcher @4 :PublicKey; # optional: the watcher performing the watch, can be the owner or a schema member
signature @5 :Signature; # optional: signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count
}
struct OperationWatchValueA @0xa726cab7064ba893 {

View File

@ -9,8 +9,7 @@ pub(in crate::rpc_processor) struct RPCOperationWatchValueQ {
subkeys: ValueSubkeyRangeSet,
expiration: u64,
count: u32,
watcher: PublicKey,
signature: Signature,
opt_watch_signature: Option<(PublicKey, Signature)>,
}
impl RPCOperationWatchValueQ {
@ -20,8 +19,8 @@ impl RPCOperationWatchValueQ {
subkeys: ValueSubkeyRangeSet,
expiration: u64,
count: u32,
watcher: PublicKey,
signature: Signature,
opt_watcher: Option<KeyPair>,
vcrypto: CryptoSystemVersion,
) -> Result<Self, RPCError> {
// Needed because RangeSetBlaze uses different types here all the time
#[allow(clippy::unnecessary_cast)]
@ -30,31 +29,46 @@ impl RPCOperationWatchValueQ {
if subkeys_len > MAX_WATCH_VALUE_Q_SUBKEYS_LEN {
return Err(RPCError::protocol("WatchValueQ subkeys length too long"));
}
let opt_watch_signature = if let Some(watcher) = opt_watcher {
let signature_data = Self::make_signature_data(&key, &subkeys, expiration, count);
let signature = vcrypto
.sign(&watcher.key, &watcher.secret, &signature_data)
.map_err(RPCError::protocol)?;
Some((watcher.key, signature))
} else {
None
};
Ok(Self {
key,
subkeys,
expiration,
count,
watcher,
signature,
opt_watch_signature,
})
}
// signature covers: key, subkeys, expiration, count, using watcher key
fn make_signature_data(&self) -> Vec<u8> {
fn make_signature_data(
key: &TypedKey,
subkeys: &ValueSubkeyRangeSet,
expiration: u64,
count: u32,
) -> Vec<u8> {
// Needed because RangeSetBlaze uses different types here all the time
#[allow(clippy::unnecessary_cast)]
let subkeys_len = self.subkeys.len() as usize;
let subkeys_len = subkeys.len() as usize;
let mut sig_data = Vec::with_capacity(PUBLIC_KEY_LENGTH + 4 + (subkeys_len * 8) + 8 + 4);
sig_data.extend_from_slice(&self.key.kind.0);
sig_data.extend_from_slice(&self.key.value.bytes);
for sk in self.subkeys.ranges() {
sig_data.extend_from_slice(&key.kind.0);
sig_data.extend_from_slice(&key.value.bytes);
for sk in subkeys.ranges() {
sig_data.extend_from_slice(&sk.start().to_le_bytes());
sig_data.extend_from_slice(&sk.end().to_le_bytes());
}
sig_data.extend_from_slice(&self.expiration.to_le_bytes());
sig_data.extend_from_slice(&self.count.to_le_bytes());
sig_data.extend_from_slice(&expiration.to_le_bytes());
sig_data.extend_from_slice(&count.to_le_bytes());
sig_data
}
@ -63,11 +77,13 @@ impl RPCOperationWatchValueQ {
return Err(RPCError::protocol("unsupported cryptosystem"));
};
let sig_data = self.make_signature_data();
vcrypto
.verify(&self.watcher, &sig_data, &self.signature)
.map_err(RPCError::protocol)?;
if let Some(watch_signature) = self.opt_watch_signature {
let sig_data =
Self::make_signature_data(&self.key, &self.subkeys, self.expiration, self.count);
vcrypto
.verify(&watch_signature.0, &sig_data, &watch_signature.1)
.map_err(RPCError::protocol)?;
}
Ok(())
}
@ -92,13 +108,8 @@ impl RPCOperationWatchValueQ {
}
#[allow(dead_code)]
pub fn watcher(&self) -> &PublicKey {
&self.watcher
}
#[allow(dead_code)]
pub fn signature(&self) -> &Signature {
&self.signature
pub fn opt_watch_signature(&self) -> Option<&(PublicKey, Signature)> {
self.opt_watch_signature.as_ref()
}
#[allow(dead_code)]
@ -109,16 +120,14 @@ impl RPCOperationWatchValueQ {
ValueSubkeyRangeSet,
u64,
u32,
PublicKey,
Signature,
Option<(PublicKey, Signature)>,
) {
(
self.key,
self.subkeys,
self.expiration,
self.count,
self.watcher,
self.signature,
self.opt_watch_signature,
)
}
@ -151,19 +160,24 @@ impl RPCOperationWatchValueQ {
let expiration = reader.get_expiration();
let count = reader.get_count();
let w_reader = reader.get_watcher().map_err(RPCError::protocol)?;
let watcher = decode_key256(&w_reader);
let opt_watch_signature = if reader.has_watcher() {
let w_reader = reader.get_watcher().map_err(RPCError::protocol)?;
let watcher = decode_key256(&w_reader);
let s_reader = reader.get_signature().map_err(RPCError::protocol)?;
let signature = decode_signature512(&s_reader);
let s_reader = reader.get_signature().map_err(RPCError::protocol)?;
let signature = decode_signature512(&s_reader);
Some((watcher, signature))
} else {
None
};
Ok(Self {
key,
subkeys,
expiration,
count,
watcher,
signature,
opt_watch_signature,
})
}
@ -188,11 +202,13 @@ impl RPCOperationWatchValueQ {
builder.set_expiration(self.expiration);
builder.set_count(self.count);
let mut w_builder = builder.reborrow().init_watcher();
encode_key256(&self.watcher, &mut w_builder);
if let Some(watch_signature) = self.opt_watch_signature {
let mut w_builder = builder.reborrow().init_watcher();
encode_key256(&watch_signature.0, &mut w_builder);
let mut s_builder = builder.reborrow().init_signature();
encode_signature512(&self.signature, &mut s_builder);
let mut s_builder = builder.reborrow().init_signature();
encode_signature512(&watch_signature.1, &mut s_builder);
}
Ok(())
}

View File

@ -37,7 +37,7 @@ impl RPCProcessor {
// and get the target noderef so we can validate the response
let Some(target) = dest.target() else {
return Err(RPCError::internal(
"Never send set value requests over private routes",
"Never send get value requests over private routes",
));
};

View File

@ -1,6 +1,168 @@
use super::*;
#[derive(Clone, Debug)]
pub struct WatchValueAnswer {
pub expiration_ts: Option<Timestamp>,
}
impl RPCProcessor {
/// Sends a watch value request and wait for response
/// Can be sent via all methods including relays
/// Safety routes may be used, but never private routes.
/// Because this leaks information about the identity of the node itself,
/// replying to this request received over a private route will leak
/// the identity of the node and defeat the private route.
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self),
fields(ret.expiration_ts,
ret.latency
),err)
)]
pub async fn rpc_call_watch_value(
self,
dest: Destination,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
opt_watcher: Option<KeyPair>,
) -> RPCNetworkResult<Answer<WatchValueAnswer>> {
// Ensure destination never has a private route
// and get the target noderef so we can validate the response
let Some(target) = dest.target() else {
return Err(RPCError::internal(
"Never send watch value requests over private routes",
));
};
// Get the target node id
let Some(vcrypto) = self.crypto.get(key.kind) else {
return Err(RPCError::internal("unsupported cryptosystem"));
};
let Some(target_node_id) = target.node_ids().get(key.kind) else {
return Err(RPCError::internal("No node id for crypto kind"));
};
let debug_string = format!(
"OUT ==> WatchValueQ({} {}#{:?}@{}+{}) => {}",
key,
if opt_watcher.is_some() { "+W " } else { "" },
subkeys,
expiration,
count,
dest
);
// Send the watchvalue question
let watch_value_q = RPCOperationWatchValueQ::new(
key,
subkeys,
expiration.as_u64(),
count,
opt_watcher,
vcrypto,
)?;
let question = RPCQuestion::new(
network_result_try!(self.get_destination_respond_to(&dest)?),
RPCQuestionDetail::WatchValueQ(Box::new(watch_value_q)),
);
#[cfg(feature = "debug-dht")]
log_rpc!(debug "{}", debug_string);
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?);
xxxxx continue here
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
TimeoutOr::Value(v) => v,
};
// Get the right answer type
let (_, _, _, kind) = msg.operation.destructure();
let get_value_a = match kind {
RPCOperationKind::Answer(a) => match a.destructure() {
RPCAnswerDetail::GetValueA(a) => a,
_ => return Ok(NetworkResult::invalid_message("not a getvalue answer")),
},
_ => return Ok(NetworkResult::invalid_message("not an answer")),
};
let (value, peers, descriptor) = get_value_a.destructure();
#[cfg(feature = "debug-dht")]
{
let debug_string_value = value
.as_ref()
.map(|v| {
format!(
" len={} seq={} writer={}",
v.value_data().data().len(),
v.value_data().seq(),
v.value_data().writer(),
)
})
.unwrap_or_default();
let debug_string_answer = format!(
"OUT <== GetValueA({} #{}{}{} peers={}) <= {}",
key,
subkey,
debug_string_value,
if descriptor.is_some() { " +desc" } else { "" },
peers.len(),
dest
);
log_rpc!(debug "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
log_rpc!(debug "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(format!(
"missing cryptosystem in peers node ids: {}",
e
)));
}
};
if !valid {
return Ok(NetworkResult::invalid_message("non-closer peers returned"));
}
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.latency", latency.as_u64());
#[cfg(feature = "verbose-tracing")]
if let Some(value) = &value {
tracing::Span::current().record("ret.value.data.len", value.value_data().data().len());
tracing::Span::current().record("ret.value.data.seq", value.value_data().seq());
tracing::Span::current().record(
"ret.value.data.writer",
value.value_data().writer().to_string(),
);
}
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.peers.len", peers.len());
Ok(NetworkResult::value(Answer::new(
latency,
GetValueAnswer {
value,
peers,
descriptor,
},
)))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))]
pub(crate) async fn process_watch_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {
// Ignore if disabled

View File

@ -4,14 +4,22 @@ use super::*;
struct OutboundGetValueContext {
/// The latest value of the subkey, may be the value passed in
pub value: Option<SignedValueData>,
/// The consensus count for the value we have received
pub value_count: usize,
/// The nodes that have returned the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<SignedValueDescriptor>,
/// The parsed schema from the descriptor if we have one
pub schema: Option<DHTSchema>,
}
/// The result of the outbound_get_value operation
struct OutboundGetValueResult {
/// The subkey that was retrieved
pub subkey_result: SubkeyResult,
/// And where it was retrieved from
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
/// Perform a 'get value' query on the network
pub async fn outbound_get_value(
@ -21,7 +29,7 @@ impl StorageManager {
subkey: ValueSubkey,
safety_selection: SafetySelection,
last_subkey_result: SubkeyResult,
) -> VeilidAPIResult<SubkeyResult> {
) -> VeilidAPIResult<OutboundGetValueResult> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue'
@ -43,7 +51,7 @@ impl StorageManager {
};
let context = Arc::new(Mutex::new(OutboundGetValueContext {
value: last_subkey_result.value,
value_count: 0,
value_nodes: vec![],
descriptor: last_subkey_result.descriptor.clone(),
schema,
}));
@ -116,12 +124,12 @@ impl StorageManager {
return Ok(NetworkResult::invalid_message("value data mismatch"));
}
// Increase the consensus count for the existing value
ctx.value_count += 1;
ctx.value_nodes.push(next_node);
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
ctx.value_nodes = vec![next_node];
} else {
// If the sequence number is older, ignore it
}
@ -129,7 +137,7 @@ impl StorageManager {
// If we have no prior value, keep it
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
ctx.value_nodes = vec![next_node];
}
}
@ -145,7 +153,9 @@ impl StorageManager {
let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
let ctx = context.lock();
if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count
if ctx.value.is_some()
&& ctx.descriptor.is_some()
&& ctx.value_nodes.len() >= consensus_count
{
return Some(());
}
@ -169,42 +179,51 @@ impl StorageManager {
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count);
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
Ok(OutboundGetValueResult {
subkey_result: SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus");
} else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count);
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
Ok(OutboundGetValueResult {
subkey_result: SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished without consensus (ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count);
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
Ok(OutboundGetValueResult {
subkey_result: SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// Failed

View File

@ -8,6 +8,7 @@ mod set_value;
mod storage_manager_inner;
mod tasks;
mod types;
mod watch_value;
use keys::*;
use limited_size::*;
@ -186,9 +187,7 @@ impl StorageManager {
.map(|r| r.unwrap())
}
/// Open an existing local record if it exists,
/// and if it doesnt exist locally, try to pull it from the network and
/// open it and return the opened descriptor
/// Open an existing local record if it exists, and if it doesnt exist locally, try to pull it from the network and open it and return the opened descriptor
pub async fn open_record(
&self,
key: TypedKey,
@ -218,7 +217,7 @@ impl StorageManager {
// No last descriptor, no last value
// Use the safety selection we opened the record with
let subkey: ValueSubkey = 0;
let subkey_result = self
let result = self
.outbound_get_value(
rpc_processor,
key,
@ -229,7 +228,7 @@ impl StorageManager {
.await?;
// If we got nothing back, the key wasn't found
if subkey_result.value.is_none() && subkey_result.descriptor.is_none() {
if result.subkey_result.value.is_none() && result.subkey_result.descriptor.is_none() {
// No result
apibail_key_not_found!(key);
};
@ -250,7 +249,7 @@ impl StorageManager {
// Open the new record
inner
.open_new_record(key, writer, subkey, subkey_result, safety_selection)
.open_new_record(key, writer, subkey, result.subkey_result, safety_selection)
.await
}
@ -278,9 +277,6 @@ impl StorageManager {
}
/// Get the value of a subkey from an opened local record
/// may refresh the record, and will if it is forced to or the subkey is not available locally yet
/// Returns Ok(None) if no value was found
/// Returns Ok(Some(value)) is a value was found online or locally
pub async fn get_value(
&self,
key: TypedKey,
@ -325,7 +321,7 @@ impl StorageManager {
.value
.as_ref()
.map(|v| v.value_data().seq());
let subkey_result = self
let result = self
.outbound_get_value(
rpc_processor,
key,
@ -336,14 +332,17 @@ impl StorageManager {
.await?;
// See if we got a value back
let Some(subkey_result_value) = subkey_result.value else {
let Some(subkey_result_value) = result.subkey_result.value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.value_nodes)?;
// If we got a new value back then write it to the opened record
if Some(subkey_result_value.value_data().seq()) != opt_last_seq {
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.await?;
@ -352,9 +351,6 @@ impl StorageManager {
}
/// Set the value of a subkey on an opened local record
/// Puts changes to the network immediately and may refresh the record if the there is a newer subkey available online
/// Returns Ok(None) if the value was set
/// Returns Ok(Some(newer value)) if a newer value was found online
pub async fn set_value(
&self,
key: TypedKey,
@ -450,7 +446,7 @@ impl StorageManager {
drop(inner);
// Use the safety selection we opened the record with
let final_signed_value_data = self
let result = self
.outbound_set_value(
rpc_processor,
key,
@ -464,35 +460,102 @@ impl StorageManager {
// Whatever record we got back, store it locally, might be newer than the one we asked to save
let mut inner = self.lock().await?;
inner
.handle_set_local_value(key, subkey, final_signed_value_data.clone())
.handle_set_local_value(key, subkey, result.signed_value_data.clone())
.await?;
// Return the new value if it differs from what was asked to set
if final_signed_value_data.value_data() != signed_value_data.value_data() {
return Ok(Some(final_signed_value_data.into_value_data()));
if result.signed_value_data.value_data() != signed_value_data.value_data() {
return Ok(Some(result.signed_value_data.into_value_data()));
}
// If the original value was set, return None
Ok(None)
}
/// Add a watch to a DHT value
pub async fn watch_values(
&self,
_key: TypedKey,
_subkeys: ValueSubkeyRangeSet,
_expiration: Timestamp,
_count: u32,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
) -> VeilidAPIResult<Timestamp> {
let _inner = self.lock().await?;
unimplemented!();
let inner = self.lock().await?;
// Get the safety selection and the writer we opened this record with
let (safety_selection, opt_writer) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
)
};
// Get rpc processor and drop mutex so we don't block while requesting the watch from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else {
apibail_try_again!("offline, try again later");
};
// Drop the lock for network access
drop(inner);
// Use the safety selection we opened the record with
let expiration_ts = self
.outbound_watch_value(
rpc_processor,
key,
subkeys,
expiration,
count,
safety_selection,
opt_writer,
)
.await?;
Ok(expiration_ts)
}
pub async fn cancel_watch_values(
&self,
_key: TypedKey,
_subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> {
let _inner = self.lock().await?;
unimplemented!();
}
// pub async fn cancel_watch_values(
// &self,
// key: TypedKey,
// subkeys: ValueSubkeyRangeSet,
// ) -> VeilidAPIResult<bool> {
// let inner = self.lock().await?;
// // // Get the safety selection and the writer we opened this record with
// // let (safety_selection, opt_writer) = {
// // let Some(opened_record) = inner.opened_records.get(&key) else {
// // apibail_generic!("record not open");
// // };
// // (
// // opened_record.safety_selection(),
// // opened_record.writer().cloned(),
// // )
// // };
// // // Get rpc processor and drop mutex so we don't block while requesting the watch from the network
// // let Some(rpc_processor) = inner.rpc_processor.clone() else {
// // apibail_try_again!("offline, try again later");
// // };
// // // Drop the lock for network access
// // drop(inner);
// // // Use the safety selection we opened the record with
// // let expiration_ts = self
// // .outbound_watch_value(
// // rpc_processor,
// // key,
// // subkeys,
// // expiration,
// // count,
// // safety_selection,
// // opt_writer,
// // )
// // .await?;
// // Ok(expiration_ts)
// }
}

View File

@ -4,14 +4,22 @@ use super::*;
struct OutboundSetValueContext {
/// The latest value of the subkey, may be the value passed in
pub value: SignedValueData,
/// The consensus count for the value we have received
pub set_count: usize,
/// The nodes that have set the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
/// The number of non-sets since the last set we have received
pub missed_since_last_set: usize,
/// The parsed schema from the descriptor if we have one
pub schema: DHTSchema,
}
/// The result of the outbound_set_value operation
struct OutboundSetValueResult {
/// The value that was set
pub signed_value_data: SignedValueData,
/// And where it was set to
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
/// Perform a 'set value' query on the network
pub async fn outbound_set_value(
@ -22,7 +30,7 @@ impl StorageManager {
safety_selection: SafetySelection,
value: SignedValueData,
descriptor: SignedValueDescriptor,
) -> VeilidAPIResult<SignedValueData> {
) -> VeilidAPIResult<OutboundSetValueResult> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'SetValue'
@ -40,7 +48,7 @@ impl StorageManager {
let schema = descriptor.schema()?;
let context = Arc::new(Mutex::new(OutboundSetValueContext {
value,
set_count: 0,
value_nodes: vec![],
missed_since_last_set: 0,
schema,
}));
@ -99,7 +107,7 @@ impl StorageManager {
// If the sequence number is greater, keep it
ctx.value = value;
// One node has shown us this value so far
ctx.set_count = 1;
ctx.value_nodes = vec![next_node];
ctx.missed_since_last_set = 0;
} else {
// If the sequence number is older, or an equal sequence number,
@ -110,7 +118,7 @@ impl StorageManager {
} else {
// It was set on this node and no newer value was found and returned,
// so increase our consensus count
ctx.set_count += 1;
ctx.value_nodes.push(next_node);
ctx.missed_since_last_set = 0;
}
} else {
@ -131,13 +139,13 @@ impl StorageManager {
let ctx = context.lock();
// If we have reached sufficient consensus, return done
if ctx.set_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
return Some(());
}
// If we have missed more than our consensus count since our last set, return done
// This keeps the traversal from searching too many nodes when we aren't converging
// Only do this if we have gotten at least half our desired sets.
if ctx.set_count >= ((consensus_count + 1) / 2)
if ctx.value_nodes.len() >= ((consensus_count + 1) / 2)
&& ctx.missed_since_last_set >= consensus_count
{
return Some(());
@ -162,35 +170,44 @@ impl StorageManager {
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.set_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.set_count);
log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(ctx.value.clone())
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished with or without consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.set_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Consensus");
} else {
log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.set_count);
log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(ctx.value.clone())
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
// If we ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.set_count >= consensus_count {
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.set_count);
log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(ctx.value.clone())
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
// Failed
TimeoutOr::Value(Err(e)) => {

View File

@ -204,7 +204,10 @@ impl StorageManagerInner {
// Add new local value record
let cur_ts = get_aligned_timestamp();
let local_record_detail = LocalRecordDetail { safety_selection };
let local_record_detail = LocalRecordDetail {
safety_selection,
value_nodes: vec![],
};
let record =
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
@ -243,7 +246,10 @@ impl StorageManagerInner {
let local_record = Record::new(
cur_ts,
remote_record.descriptor().clone(),
LocalRecordDetail { safety_selection },
LocalRecordDetail {
safety_selection,
value_nodes: vec![],
},
)?;
local_record_store.new_record(key, local_record).await?;
@ -379,7 +385,10 @@ impl StorageManagerInner {
let record = Record::<LocalRecordDetail>::new(
get_aligned_timestamp(),
signed_value_descriptor,
LocalRecordDetail { safety_selection },
LocalRecordDetail {
safety_selection,
value_nodes: vec![],
},
)?;
local_record_store.new_record(key, record).await?;
@ -400,6 +409,53 @@ impl StorageManagerInner {
Ok(descriptor)
}
pub fn get_value_nodes(&mut self, key: TypedKey) -> VeilidAPIResult<Option<Vec<NodeRef>>> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Get routing table to see if we still know about these nodes
let Some(routing_table) = self.rpc_processor.map(|r| r.routing_table()) else {
apibail_try_again!("offline, try again later");
};
let opt_value_nodes = local_record_store.with_record(key, |r| {
let d = r.detail();
d.value_nodes
.iter()
.copied()
.filter_map(|x| {
routing_table
.lookup_node_ref(TypedKey::new(key.kind, x))
.ok()
.flatten()
})
.collect()
});
Ok(opt_value_nodes)
}
pub fn set_value_nodes(
&mut self,
key: TypedKey,
value_nodes: Vec<NodeRef>,
) -> VeilidAPIResult<()> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
local_record_store.with_record_mut(key, |r| {
let d = r.detail_mut();
d.value_nodes = value_nodes
.into_iter()
.filter_map(|x| x.node_ids().get(key.kind).map(|k| k.value))
.collect();
});
Ok(())
}
pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult<()> {
let Some(_opened_record) = self.opened_records.remove(&key) else {
apibail_generic!("record not open");

View File

@ -6,4 +6,7 @@ pub struct LocalRecordDetail {
/// The last 'safety selection' used when creating/opening this record.
/// Even when closed, this safety selection applies to re-publication attempts by the system.
pub safety_selection: SafetySelection,
/// The nodes that we have seen this record cached on recently
#[serde(default)]
pub value_nodes: Vec<PublicKey>,
}

View File

@ -79,9 +79,9 @@ where
+ self.record_data_size
}
// pub fn detail(&self) -> &D {
// &self.detail
// }
pub fn detail(&self) -> &D {
&self.detail
}
pub fn detail_mut(&mut self) -> &mut D {
&mut self.detail
}

View File

@ -0,0 +1,236 @@
use super::*;
/// The context of the outbound_watch_value operation
struct OutboundWatchValueContext {
/// The timestamp for the expiration of the watch we successfully got
pub opt_expiration_ts: Option<Timestamp>,
}
impl StorageManager {
/// Perform a 'watch value' query on the network
pub async fn outbound_watch_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
safety_selection: SafetySelection,
opt_writer: Option<KeyPair>,
) -> VeilidAPIResult<Timestamp> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue', some of which are the same for 'WatchValue' operations
let (key_count, fanout, timeout_us) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.max_find_node_count as usize,
c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
)
};
// Make do-watch-value answer context
let schema = if let Some(d) = &last_subkey_result.descriptor {
Some(d.schema()?)
} else {
None
};
let context = Arc::new(Mutex::new(OutboundWatchValueContext {
opt_expiration_ts: None,
}));
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
let context = context.clone();
let last_descriptor = last_subkey_result.descriptor.clone();
async move {
let gva = network_result_try!(
rpc_processor
.clone()
.rpc_call_watch_value(
Destination::direct(next_node.clone()).with_safety(safety_selection),
key,
subkey,
last_descriptor,
)
.await?
);
// Keep the descriptor if we got one. If we had a last_descriptor it will
// already be validated by rpc_call_get_value
if let Some(descriptor) = gva.answer.descriptor {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(descriptor);
}
}
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(NetworkResult::invalid_message(
"Got value with no descriptor",
));
};
// Validate with schema
if !schema.check_subkey_value_data(
descriptor.owner(),
subkey,
value.value_data(),
) {
// Validation failed, ignore this value
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(
"Schema validation failed on subkey {}",
subkey
)));
}
// If we have a prior value, see if this is a newer sequence number
if let Some(prior_value) = &ctx.value {
let prior_seq = prior_value.value_data().seq();
let new_seq = value.value_data().seq();
if new_seq == prior_seq {
// If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() {
// Move to the next node
return Ok(NetworkResult::invalid_message("value data mismatch"));
}
// Increase the consensus count for the existing value
ctx.value_count += 1;
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
} else {
// If we have no prior value, keep it
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
}
}
// Return peers if we have some
#[cfg(feature = "network-result-extra")]
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
Ok(NetworkResult::value(gva.answer.peers))
}
};
// Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
let ctx = context.lock();
if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count
{
return Some(());
}
None
};
// Call the fanout
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
key_count,
fanout,
timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT]),
call_routine,
check_done,
);
match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
}
// If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus");
} else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
}
// If we finished without consensus (ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult {
value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(),
})
}
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "GetValue Fanout Error: {}", e);
Err(e.into())
}
}
}
/// Handle a received 'Get Value' query
// pub async fn inbound_get_value(
// &self,
// key: TypedKey,
// subkey: ValueSubkey,
// want_descriptor: bool,
// ) -> VeilidAPIResult<NetworkResult<SubkeyResult>> {
// let mut inner = self.lock().await?;
// let res = match inner
// .handle_get_remote_value(key, subkey, want_descriptor)
// .await
// {
// Ok(res) => res,
// Err(VeilidAPIError::Internal { message }) => {
// apibail_internal!(message);
// }
// Err(e) => {
// return Ok(NetworkResult::invalid_message(e));
// }
// };
// Ok(NetworkResult::value(res))
// }
}

View File

@ -314,13 +314,25 @@ impl RoutingContext {
storage_manager.set_value(key, subkey, data).await
}
/// Watches changes to an opened or created value
/// Add a watch to a DHT value that informs the user via an VeilidUpdate::ValueChange callback when the record has subkeys change.
/// One remote node will be selected to perform the watch and it will offer an expiration time based on a suggestion, and make an attempt to
/// continue to report changes via the callback. Nodes that agree to doing watches will be put on our 'ping' list to ensure they are still around
/// otherwise the watch will be cancelled and will have to be re-watched.
///
/// Changes to subkeys within the subkey range are returned via a ValueChanged callback
/// If the subkey range is empty, all subkey changes are considered
/// Expiration can be infinite to keep the watch for the maximum amount of time
/// There is only one watch permitted per record. If a change to a watch is desired, the first one must will be overwritten.
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to watch. The range must not exceed 512 discrete non-overlapping or adjacent subranges. If no range is specified, this is equivalent to watching the entire range of subkeys.
/// * `expiration` is the desired timestamp of when to automatically terminate the watch, in microseconds. If this value is less than `network.rpc.timeout_ms` milliseconds in the future, this function will return an error immediately.
/// * `count` is the number of times the watch will be sent, maximum. A zero value here is equivalent to a cancellation.
///
/// Return value upon success is the amount of time allowed for the watch
/// Returns a timestamp of when the watch will expire. All watches are guaranteed to expire at some point in the future, and the returned timestamp will
/// be no later than the requested expiration, but -may- be before the requested expiration.
///
/// DHT watches are accepted with the following conditions:
/// * First-come first-served basis for arbitrary unauthenticated readers, up to network.dht.public_watch_limit per record
/// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer
///
/// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability.
pub async fn watch_dht_values(
&self,
key: TypedKey,

View File

@ -144,6 +144,8 @@ pub fn fix_veilidconfiginner() -> VeilidConfigInner {
remote_max_records: 17,
remote_max_subkey_cache_memory_mb: 18,
remote_max_storage_space_mb: 19,
public_watch_limit: 20,
member_watch_limit: 21,
},
upnp: true,
detect_address_changes: false,

View File

@ -301,6 +301,8 @@ pub struct VeilidConfigDHT {
pub remote_max_records: u32,
pub remote_max_subkey_cache_memory_mb: u32,
pub remote_max_storage_space_mb: u32,
pub public_watch_limit: u32,
pub member_watch_limit: u32,
}
impl Default for VeilidConfigDHT {
@ -758,6 +760,8 @@ impl VeilidConfig {
get_config!(inner.network.dht.remote_max_records);
get_config!(inner.network.dht.remote_max_subkey_cache_memory_mb);
get_config!(inner.network.dht.remote_max_storage_space_mb);
get_config!(inner.network.dht.public_watch_limit);
get_config!(inner.network.dht.member_watch_limit);
get_config!(inner.network.rpc.concurrency);
get_config!(inner.network.rpc.queue_size);
get_config!(inner.network.rpc.max_timestamp_behind_ms);

View File

@ -138,7 +138,9 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb: await getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb(),
publicWatchLimit: 32,
memberWatchLimit: 8),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,

View File

@ -263,26 +263,29 @@ class VeilidConfigTLS with _$VeilidConfigTLS {
////////////
@freezed
class VeilidConfigDHT with _$VeilidConfigDHT {
const factory VeilidConfigDHT(
{required int resolveNodeTimeoutMs,
required int resolveNodeCount,
required int resolveNodeFanout,
required int maxFindNodeCount,
required int getValueTimeoutMs,
required int getValueCount,
required int getValueFanout,
required int setValueTimeoutMs,
required int setValueCount,
required int setValueFanout,
required int minPeerCount,
required int minPeerRefreshTimeMs,
required int validateDialInfoReceiptTimeMs,
required int localSubkeyCacheSize,
required int localMaxSubkeyCacheMemoryMb,
required int remoteSubkeyCacheSize,
required int remoteMaxRecords,
required int remoteMaxSubkeyCacheMemoryMb,
required int remoteMaxStorageSpaceMb}) = _VeilidConfigDHT;
const factory VeilidConfigDHT({
required int resolveNodeTimeoutMs,
required int resolveNodeCount,
required int resolveNodeFanout,
required int maxFindNodeCount,
required int getValueTimeoutMs,
required int getValueCount,
required int getValueFanout,
required int setValueTimeoutMs,
required int setValueCount,
required int setValueFanout,
required int minPeerCount,
required int minPeerRefreshTimeMs,
required int validateDialInfoReceiptTimeMs,
required int localSubkeyCacheSize,
required int localMaxSubkeyCacheMemoryMb,
required int remoteSubkeyCacheSize,
required int remoteMaxRecords,
required int remoteMaxSubkeyCacheMemoryMb,
required int remoteMaxStorageSpaceMb,
required int publicWatchLimit,
required int memberWatchLimit,
}) = _VeilidConfigDHT;
factory VeilidConfigDHT.fromJson(dynamic json) =>
_$VeilidConfigDHTFromJson(json as Map<String, dynamic>);

View File

@ -110,6 +110,9 @@ class VeilidConfigDHT(ConfigBase):
remote_max_records: int
remote_max_subkey_cache_memory_mb: int
remote_max_storage_space_mb: int
public_watch_limit: int
member_watch_limit: int
@dataclass

View File

@ -109,6 +109,8 @@ core:
remote_max_records: 65536
remote_max_subkey_cache_memory_mb: %REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%
remote_max_storage_space_mb: 0
public_watch_limit: 32
member_watch_limit: 8
upnp: true
detect_address_changes: true
restricted_nat_retries: 0
@ -562,6 +564,8 @@ pub struct Dht {
pub remote_max_records: u32,
pub remote_max_subkey_cache_memory_mb: u32,
pub remote_max_storage_space_mb: u32,
pub public_watch_limit: u32,
pub member_watch_limit: u32,
}
#[derive(Debug, Deserialize, Serialize)]
@ -948,6 +952,8 @@ impl Settings {
value
);
set_config_value!(inner.core.network.dht.remote_max_storage_space_mb, value);
set_config_value!(inner.core.network.dht.public_watch_limit, value);
set_config_value!(inner.core.network.dht.member_watch_limit, value);
set_config_value!(inner.core.network.upnp, value);
set_config_value!(inner.core.network.detect_address_changes, value);
set_config_value!(inner.core.network.restricted_nat_retries, value);
@ -1189,7 +1195,12 @@ impl Settings {
"network.dht.remote_max_storage_space_mb" => {
Ok(Box::new(inner.core.network.dht.remote_max_storage_space_mb))
}
"network.dht.public_watch_limit" => {
Ok(Box::new(inner.core.network.dht.public_watch_limit))
}
"network.dht.member_watch_limit" => {
Ok(Box::new(inner.core.network.dht.member_watch_limit))
}
"network.upnp" => Ok(Box::new(inner.core.network.upnp)),
"network.detect_address_changes" => {
Ok(Box::new(inner.core.network.detect_address_changes))

View File

@ -88,6 +88,8 @@ export const veilidCoreStartupConfig = {
remote_max_records: 65536,
remote_max_subkey_cache_memory_mb: 256,
remote_max_storage_space_mb: 0,
public_watch_limit: 32,
member_watch_limit: 8,
},
upnp: true,
detect_address_changes: true,