send valuechanged

This commit is contained in:
Christien Rioux 2023-12-04 22:01:50 -05:00
parent d1dad8de61
commit a67bfde1f7
30 changed files with 277 additions and 175 deletions

View File

@ -120,6 +120,8 @@ impl ServicesContext {
// Set up storage manager
trace!("init storage manager");
let update_callback = self.update_callback.clone();
let storage_manager = StorageManager::new(
self.config.clone(),
self.crypto.clone().unwrap(),
@ -127,7 +129,7 @@ impl ServicesContext {
#[cfg(feature = "unstable-blockstore")]
self.block_store.clone().unwrap(),
);
if let Err(e) = storage_manager.init().await {
if let Err(e) = storage_manager.init(update_callback).await {
error!("failed to init storage manager: {}", e);
self.shutdown().await;
return Err(e);

View File

@ -183,7 +183,6 @@ impl RPCProcessor {
&self,
target: Target,
safety_selection: SafetySelection,
sequencing: Sequencing,
) -> Result<rpc_processor::Destination, RPCError> {
match target {
Target::NodeId(node_id) => {
@ -195,7 +194,7 @@ impl RPCProcessor {
}
};
// Apply sequencing to match safety selection
nr.set_sequencing(sequencing);
nr.set_sequencing(safety_selection.get_sequencing());
Ok(rpc_processor::Destination::Direct {
node: nr,

View File

@ -24,7 +24,7 @@ impl RPCProcessor {
let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -44,7 +44,7 @@ impl RPCProcessor {
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -83,7 +83,7 @@ impl RPCProcessor {
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
@ -279,9 +279,9 @@ impl RPCProcessor {
// Make GetValue answer
let get_value_a = RPCOperationGetValueA::new(
subkey_result_value,
subkey_result_value.map(|x| (*x).clone()),
closer_to_key_peers,
subkey_result_descriptor,
subkey_result_descriptor.map(|x| (*x).clone()),
)?;
// Send GetValue answer

View File

@ -97,7 +97,7 @@ impl RPCProcessor {
);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
@ -257,7 +257,7 @@ impl RPCProcessor {
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
let new_value = network_result_try!(storage_manager
.inbound_set_value(key, subkey, value, descriptor)
.inbound_set_value(key, subkey, Arc::new(value), descriptor.map(Arc::new))
.await
.map_err(RPCError::internal)?);
@ -292,7 +292,7 @@ impl RPCProcessor {
}
// Make SetValue answer
let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?;
let set_value_a = RPCOperationSetValueA::new(set, new_value.map(|x| (*x).clone()), closer_to_key_peers)?;
// Send SetValue answer
self.answer(msg, RPCAnswer::new(RPCAnswerDetail::SetValueA(Box::new(set_value_a))))

View File

@ -114,7 +114,7 @@ impl RPCProcessor {
let send_data_method = waitable_reply.send_data_method.clone();
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -59,7 +59,7 @@ impl RPCProcessor {
// Save the subkey, creating a new record if necessary
let storage_manager = self.storage_manager();
storage_manager
.inbound_value_changed(key, subkeys, count, value)
.inbound_value_changed(key, subkeys, count, Arc::new(value))
.await
.map_err(RPCError::internal)?;

View File

@ -73,7 +73,7 @@ impl RPCProcessor {
network_result_try!(self.question(dest.clone(), question, None).await?);
// Keep the reply private route that was used to return with the answer
let reply_private_route = waitable_reply.reply_private_route.clone();
let reply_private_route = waitable_reply.reply_private_route;
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -3,17 +3,17 @@ use super::*;
/// The context of the outbound_get_value operation
struct OutboundGetValueContext {
/// The latest value of the subkey, may be the value passed in
pub value: Option<SignedValueData>,
pub value: Option<Arc<SignedValueData>>,
/// The nodes that have returned the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<SignedValueDescriptor>,
pub descriptor: Option<Arc<SignedValueDescriptor>>,
/// The parsed schema from the descriptor if we have one
pub schema: Option<DHTSchema>,
}
/// The result of the outbound_get_value operation
struct OutboundGetValueResult {
pub(super) struct OutboundGetValueResult {
/// The subkey that was retrieved
pub subkey_result: SubkeyResult,
/// And where it was retrieved from
@ -22,7 +22,7 @@ struct OutboundGetValueResult {
impl StorageManager {
/// Perform a 'get value' query on the network
pub async fn outbound_get_value(
pub(super) async fn outbound_get_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
@ -69,7 +69,7 @@ impl StorageManager {
Destination::direct(next_node.clone()).with_safety(safety_selection),
key,
subkey,
last_descriptor,
last_descriptor.map(|x| (*x).clone()),
)
.await?
);
@ -80,7 +80,7 @@ impl StorageManager {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(descriptor);
ctx.descriptor = Some(Arc::new(descriptor));
}
}
@ -127,7 +127,7 @@ impl StorageManager {
ctx.value_nodes.push(next_node);
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
} else {
@ -135,7 +135,7 @@ impl StorageManager {
}
} else {
// If we have no prior value, keep it
ctx.value = Some(value);
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
}

View File

@ -34,6 +34,16 @@ const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
/// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
#[derive(Debug, Clone)]
/// A single 'value changed' message to send
struct ValueChangedInfo {
target: Target,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: Arc<SignedValueData>,
}
struct StorageManagerUnlockedInner {
config: VeilidConfig,
crypto: Crypto,
@ -112,11 +122,11 @@ impl StorageManager {
}
#[instrument(level = "debug", skip_all, err)]
pub async fn init(&self) -> EyreResult<()> {
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
debug!("startup storage manager");
let mut inner = self.inner.lock().await;
inner.init(self.clone()).await?;
inner.init(self.clone(), update_callback).await?;
Ok(())
}
@ -347,7 +357,7 @@ impl StorageManager {
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.into_value_data()));
return Ok(Some(last_subkey_result_value.value_data().clone()));
}
}
@ -357,7 +367,7 @@ impl StorageManager {
let Some(rpc_processor) = inner.rpc_processor.clone() else {
// Return the existing value if we have one if we aren't online
if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.into_value_data()));
return Ok(Some(last_subkey_result_value.value_data().clone()));
}
apibail_try_again!("offline, try again later");
};
@ -397,7 +407,7 @@ impl StorageManager {
.handle_set_local_value(key, subkey, subkey_result_value.clone())
.await?;
}
Ok(Some(subkey_result_value.into_value_data()))
Ok(Some(subkey_result_value.value_data().clone()))
}
/// Set the value of a subkey on an opened local record
@ -460,13 +470,13 @@ impl StorageManager {
}
// Sign the new value data with the writer
let signed_value_data = SignedValueData::make_signature(
let signed_value_data = Arc::new(SignedValueData::make_signature(
value_data,
descriptor.owner(),
subkey,
vcrypto,
writer.secret,
)?;
)?);
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::online_writes_ready_inner(&inner) else {
@ -515,7 +525,7 @@ impl StorageManager {
// Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != signed_value_data.value_data() {
return Ok(Some(result.signed_value_data.into_value_data()));
return Ok(Some(result.signed_value_data.value_data().clone()));
}
// If the original value was set, return None
@ -668,4 +678,33 @@ impl StorageManager {
Ok(true)
}
// Send single value change out to the network
#[instrument(level = "trace", skip(self), err)]
async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> {
let rpc_processor = {
let inner = self.inner.lock().await;
if let Some(rpc_processor) = &inner.rpc_processor {
rpc_processor.clone()
} else {
apibail_try_again!("network is not available");
}
};
let dest = rpc_processor
.resolve_target_to_destination(
vc.target,
SafetySelection::Unsafe(Sequencing::NoPreference),
)
.await
.map_err(VeilidAPIError::from)?;
network_result_value_or_log!(rpc_processor
.rpc_call_value_changed(dest, vc.key, vc.subkeys, vc.count, (*vc.value).clone())
.await
.map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {
});
Ok(())
}
}

View File

@ -40,17 +40,7 @@ struct WatchedRecord {
watchers: Vec<WatchedRecordWatch>,
}
#[derive(Debug, Clone)]
/// A single 'value changed' message to send
pub struct ValueChangedInfo {
target: Target,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: SignedValueData,
}
pub struct RecordStore<D>
pub(super) struct RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
@ -631,7 +621,7 @@ where
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
signed_value_data: Arc<SignedValueData>,
) -> VeilidAPIResult<()> {
// Check size limit for data
if signed_value_data.value_data().data().len() > self.limits.max_subkey_size {
@ -847,7 +837,7 @@ where
}
if let Some(dw) = dead_watcher {
watch.watchers.remove(dw);
if watch.watchers.len() == 0 {
if watch.watchers.is_empty() {
is_empty = true;
}
}
@ -860,6 +850,16 @@ where
}
pub async fn take_value_changes(&mut self, changes: &mut Vec<ValueChangedInfo>) {
// ValueChangedInfo but without the subkey data that requires a double mutable borrow to get
struct EarlyValueChangedInfo {
target: Target,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
}
let mut evcis = vec![];
for rtk in self.changed_watched_values.drain() {
if let Some(watch) = self.watched_records.get_mut(&rtk) {
// Process watch notifications
@ -877,39 +877,51 @@ where
dead_watchers.push(wn);
}
// Get the first subkey data
let Some(first_subkey) = subkeys.first() else {
log_stor!(error "first subkey should exist for value change notification");
continue;
};
let subkey_result = match self.get_subkey(rtk.key, first_subkey, false).await {
Ok(Some(skr)) => skr,
Ok(None) => {
log_stor!(error "subkey should have data for value change notification");
continue;
}
Err(e) => {
log_stor!(error "error getting subkey data for value change notification: {}", e);
continue;
}
};
let Some(value) = subkey_result.value else {
log_stor!(error "first subkey should have had value for value change notification");
continue;
};
let vci = ValueChangedInfo {
evcis.push(EarlyValueChangedInfo {
target: w.target.clone(),
key: rtk.key,
subkeys,
count,
value,
};
});
}
changes.push(vci);
// Remove in reverse so we don't have to offset the index to remove the right key
for dw in dead_watchers.iter().rev().copied() {
watch.watchers.remove(dw);
}
}
}
for evci in evcis {
// Get the first subkey data
let Some(first_subkey) = evci.subkeys.first() else {
log_stor!(error "first subkey should exist for value change notification");
continue;
};
let subkey_result = match self.get_subkey(evci.key, first_subkey, false).await {
Ok(Some(skr)) => skr,
Ok(None) => {
log_stor!(error "subkey should have data for value change notification");
continue;
}
Err(e) => {
log_stor!(error "error getting subkey data for value change notification: {}", e);
continue;
}
};
let Some(value) = subkey_result.value else {
log_stor!(error "first subkey should have had value for value change notification");
continue;
};
changes.push(ValueChangedInfo {
target: evci.target,
key: evci.key,
subkeys: evci.subkeys,
count: evci.count,
value,
});
}
}
/// LRU out some records until we reclaim the amount of space requested
@ -931,7 +943,7 @@ where
reclaimed
}
pub(super) fn debug_records(&self) -> String {
pub fn debug_records(&self) -> String {
// Dump fields in an abbreviated way
let mut out = String::new();
@ -963,16 +975,12 @@ where
out
}
pub(super) fn debug_record_info(&self, key: TypedKey) -> String {
pub fn debug_record_info(&self, key: TypedKey) -> String {
self.peek_record(key, |r| format!("{:#?}", r))
.unwrap_or("Not found".to_owned())
}
pub(super) async fn debug_record_subkey_info(
&self,
key: TypedKey,
subkey: ValueSubkey,
) -> String {
pub async fn debug_record_subkey_info(&self, key: TypedKey, subkey: ValueSubkey) -> String {
match self.peek_subkey(key, subkey, true).await {
Ok(Some(v)) => {
format!("{:#?}", v)

View File

@ -3,7 +3,7 @@ use super::*;
/// The context of the outbound_set_value operation
struct OutboundSetValueContext {
/// The latest value of the subkey, may be the value passed in
pub value: SignedValueData,
pub value: Arc<SignedValueData>,
/// The nodes that have set the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
/// The number of non-sets since the last set we have received
@ -13,23 +13,23 @@ struct OutboundSetValueContext {
}
/// The result of the outbound_set_value operation
struct OutboundSetValueResult {
pub(super) struct OutboundSetValueResult {
/// The value that was set
pub signed_value_data: SignedValueData,
pub signed_value_data: Arc<SignedValueData>,
/// And where it was set to
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
/// Perform a 'set value' query on the network
pub async fn outbound_set_value(
pub(super) async fn outbound_set_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
value: SignedValueData,
descriptor: SignedValueDescriptor,
value: Arc<SignedValueData>,
descriptor: Arc<SignedValueDescriptor>,
) -> VeilidAPIResult<OutboundSetValueResult> {
let routing_table = rpc_processor.routing_table();
@ -75,8 +75,8 @@ impl StorageManager {
Destination::direct(next_node.clone()).with_safety(safety_selection),
key,
subkey,
value,
descriptor.clone(),
(*value).clone(),
(*descriptor).clone(),
send_descriptor,
)
.await?
@ -105,7 +105,7 @@ impl StorageManager {
let new_seq = value.value_data().seq();
if new_seq > prior_seq {
// If the sequence number is greater, keep it
ctx.value = value;
ctx.value = Arc::new(value);
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
ctx.missed_since_last_set = 0;
@ -225,9 +225,9 @@ impl StorageManager {
&self,
key: TypedKey,
subkey: ValueSubkey,
value: SignedValueData,
descriptor: Option<SignedValueDescriptor>,
) -> VeilidAPIResult<NetworkResult<Option<SignedValueData>>> {
value: Arc<SignedValueData>,
descriptor: Option<Arc<SignedValueDescriptor>>,
) -> VeilidAPIResult<NetworkResult<Option<Arc<SignedValueData>>>> {
let mut inner = self.lock().await?;
// See if this is a remote or local value

View File

@ -28,6 +28,8 @@ pub(super) struct StorageManagerInner {
pub rpc_processor: Option<RPCProcessor>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to
pub update_callback: Option<UpdateCallback>,
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
@ -78,10 +80,15 @@ impl StorageManagerInner {
metadata_db: Default::default(),
rpc_processor: Default::default(),
tick_future: Default::default(),
update_callback: None,
}
}
pub async fn init(&mut self, outer_self: StorageManager) -> EyreResult<()> {
pub async fn init(
&mut self,
outer_self: StorageManager,
update_callback: UpdateCallback,
) -> EyreResult<()> {
let metadata_db = self
.unlocked_inner
.table_store
@ -121,13 +128,15 @@ impl StorageManagerInner {
}
});
self.tick_future = Some(tick_future);
self.update_callback = Some(update_callback);
self.initialized = true;
Ok(())
}
pub async fn terminate(&mut self) {
self.update_callback = None;
// Stop ticker
let tick_future = self.tick_future.take();
if let Some(f) = tick_future {
@ -207,12 +216,12 @@ impl StorageManagerInner {
let owner = vcrypto.generate_keypair();
// Make a signed value descriptor for this dht value
let signed_value_descriptor = SignedValueDescriptor::make_signature(
let signed_value_descriptor = Arc::new(SignedValueDescriptor::make_signature(
owner.key,
schema_data,
vcrypto.clone(),
owner.secret,
)?;
)?);
// Add new local value record
let cur_ts = get_aligned_timestamp();
@ -428,11 +437,11 @@ impl StorageManagerInner {
};
// Get routing table to see if we still know about these nodes
let Some(routing_table) = self.rpc_processor.map(|r| r.routing_table()) else {
let Some(routing_table) = self.rpc_processor.as_ref().map(|r| r.routing_table()) else {
apibail_try_again!("offline, try again later");
};
let opt_value_nodes = local_record_store.with_record(key, |r| {
let opt_value_nodes = local_record_store.peek_record(key, |r| {
let d = r.detail();
d.value_nodes
.iter()
@ -475,7 +484,7 @@ impl StorageManagerInner {
Ok(opened_record)
}
pub async fn handle_get_local_value(
pub(super) async fn handle_get_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
@ -498,11 +507,11 @@ impl StorageManagerInner {
})
}
pub async fn handle_set_local_value(
pub(super) async fn handle_set_local_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
signed_value_data: Arc<SignedValueData>,
) -> VeilidAPIResult<()> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
@ -517,7 +526,7 @@ impl StorageManagerInner {
Ok(())
}
pub async fn handle_watch_local_value(
pub(super) async fn handle_watch_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
@ -535,7 +544,7 @@ impl StorageManagerInner {
.await
}
pub async fn handle_get_remote_value(
pub(super) async fn handle_get_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
@ -558,12 +567,12 @@ impl StorageManagerInner {
})
}
pub async fn handle_set_remote_value(
pub(super) async fn handle_set_remote_value(
&mut self,
key: TypedKey,
subkey: ValueSubkey,
signed_value_data: SignedValueData,
signed_value_descriptor: SignedValueDescriptor,
signed_value_data: Arc<SignedValueData>,
signed_value_descriptor: Arc<SignedValueDescriptor>,
) -> VeilidAPIResult<()> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
@ -591,7 +600,7 @@ impl StorageManagerInner {
Ok(())
}
pub async fn handle_watch_remote_value(
pub(super) async fn handle_watch_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
@ -614,7 +623,8 @@ impl StorageManagerInner {
where
D: fmt::Debug + Clone + Serialize,
{
let compiled = record.descriptor().schema_data();
let descriptor = record.descriptor();
let compiled = descriptor.schema_data();
let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len());
hash_data.extend_from_slice(&vcrypto.kind().0);
hash_data.extend_from_slice(&record.owner().bytes);

View File

@ -5,7 +5,7 @@ use stop_token::future::FutureExt;
impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn send_value_changes_task_routine(
pub(super) async fn send_value_changes_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
@ -13,22 +13,31 @@ impl StorageManager {
) -> EyreResult<()> {
let mut value_changes: Vec<ValueChangedInfo> = vec![];
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store
.take_value_changes(&mut value_changes)
.await?;
{
let mut inner = self.inner.lock().await;
if let Some(local_record_store) = &mut inner.local_record_store {
local_record_store
.take_value_changes(&mut value_changes)
.await;
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store
.take_value_changes(&mut value_changes)
.await;
}
}
if let Some(remote_record_store) = &mut inner.remote_record_store {
remote_record_store
.take_value_changes(&mut value_changes)
.await?;
}
// Send all value changes in parallel
let mut unord = FuturesUnordered::new();
// xxx
// Add a future for each value change
for vc in value_changes {
let this = self.clone();
unord.push(async move {
if let Err(e) = this.send_value_change(vc).await {
log_stor!(debug "Failed to send value change: {}", e);
}
});
}
while !unord.is_empty() {
match unord.next().timeout_at(stop_token.clone()).await {

View File

@ -2,7 +2,7 @@ use super::*;
/// Information required to handle locally opened records
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LocalRecordDetail {
pub(in crate::storage_manager) struct LocalRecordDetail {
/// The last 'safety selection' used when creating/opening this record.
/// Even when closed, this safety selection applies to re-publication attempts by the system.
pub safety_selection: SafetySelection,

View File

@ -8,10 +8,10 @@ mod signed_value_descriptor;
use super::*;
pub use local_record_detail::*;
pub use opened_record::*;
pub use record::*;
pub use record_data::*;
pub use remote_record_detail::*;
pub(super) use local_record_detail::*;
pub(super) use opened_record::*;
pub(super) use record::*;
pub(super) use record_data::*;
pub(super) use remote_record_detail::*;
pub use signed_value_data::*;
pub use signed_value_descriptor::*;

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Clone, Debug)]
pub struct ActiveWatch {
pub(in crate::storage_manager) struct ActiveWatch {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
@ -17,7 +17,7 @@ pub struct ActiveWatch {
/// The state associated with a local record when it is opened
/// This is not serialized to storage as it is ephemeral for the lifetime of the opened record
#[derive(Clone, Debug, Default)]
pub struct OpenedRecord {
pub(in crate::storage_manager) struct OpenedRecord {
/// The key pair used to perform writes to subkey on this opened record
/// Without this, set_value() will fail regardless of which key or subkey is being written to
/// as all writes are signed

View File

@ -1,11 +1,11 @@
use super::*;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Record<D>
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(in crate::storage_manager) struct Record<D>
where
D: fmt::Debug + Serialize,
D: fmt::Debug + Serialize + Clone,
{
descriptor: SignedValueDescriptor,
descriptor: Arc<SignedValueDescriptor>,
subkey_count: usize,
stored_subkeys: ValueSubkeyRangeSet,
last_touched_ts: Timestamp,
@ -15,11 +15,11 @@ where
impl<D> Record<D>
where
D: fmt::Debug + Serialize,
D: fmt::Debug + Serialize + Clone,
{
pub fn new(
cur_ts: Timestamp,
descriptor: SignedValueDescriptor,
descriptor: Arc<SignedValueDescriptor>,
detail: D,
) -> VeilidAPIResult<Self> {
let schema = descriptor.schema()?;
@ -34,8 +34,8 @@ where
})
}
pub fn descriptor(&self) -> &SignedValueDescriptor {
&self.descriptor
pub fn descriptor(&self) -> Arc<SignedValueDescriptor> {
self.descriptor.clone()
}
pub fn owner(&self) -> &PublicKey {
self.descriptor.owner()

View File

@ -1,12 +1,10 @@
use super::*;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct RecordData {
pub(in crate::storage_manager) struct RecordData {
signed_value_data: Arc<SignedValueData>,
}
xxx continue here, use arc everywhere to avoid copies
impl RecordData {
pub fn new(signed_value_data: Arc<SignedValueData>) -> Self {
Self { signed_value_data }

View File

@ -1,4 +1,4 @@
use super::*;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RemoteRecordDetail {}
pub(in crate::storage_manager) struct RemoteRecordDetail {}

View File

@ -48,10 +48,6 @@ impl SignedValueData {
&self.value_data
}
pub fn into_value_data(self) -> ValueData {
self.value_data
}
pub fn signature(&self) -> &Signature {
&self.signature
}

View File

@ -8,7 +8,7 @@ struct OutboundWatchValueContext {
/// The result of the outbound_watch_value operation
#[derive(Debug, Clone)]
struct OutboundWatchValueResult {
pub(super) struct OutboundWatchValueResult {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
@ -19,7 +19,8 @@ struct OutboundWatchValueResult {
impl StorageManager {
/// Perform a 'watch value' query on the network
pub async fn outbound_watch_value(
#[allow(clippy::too_many_arguments)]
pub(super) async fn outbound_watch_value(
&self,
rpc_processor: RPCProcessor,
key: TypedKey,
@ -219,8 +220,30 @@ impl StorageManager {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: SignedValueData,
value: Arc<SignedValueData>,
) -> VeilidAPIResult<()> {
//
// Update local record store with new value
let (res, opt_update_callback) = {
let mut inner = self.lock().await?;
let res = if let Some(first_subkey) = subkeys.first() {
inner
.handle_set_local_value(key, first_subkey, value.clone())
.await
} else {
VeilidAPIResult::Ok(())
};
(res, inner.update_callback.clone())
};
// Announce ValueChanged VeilidUpdate
if let Some(update_callback) = opt_update_callback {
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key,
subkeys,
count,
value: value.value_data().clone(),
})));
}
res
}
}

View File

@ -123,11 +123,7 @@ impl RoutingContext {
async fn get_destination(&self, target: Target) -> VeilidAPIResult<rpc_processor::Destination> {
let rpc_processor = self.api.rpc_processor()?;
rpc_processor
.resolve_target_to_destination(
target,
self.unlocked_inner.safety_selection,
self.sequencing(),
)
.resolve_target_to_destination(target, self.unlocked_inner.safety_selection)
.await
.map_err(VeilidAPIError::invalid_target)
}

View File

@ -208,7 +208,7 @@ pub fn fix_veilidconfiginner() -> VeilidConfigInner {
pub fn fix_veilidvaluechange() -> VeilidValueChange {
VeilidValueChange {
key: fix_typedkey(),
subkeys: vec![1, 2, 3, 4],
subkeys: ValueSubkeyRangeSet::new(),
count: 5,
value: ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap(),
}

View File

@ -55,7 +55,7 @@ impl DHTSchemaDFLT {
}
/// Check if a key is a schema member
pub fn is_member(&self, key: &PublicKey) -> bool {
pub fn is_member(&self, _key: &PublicKey) -> bool {
false
}
}

View File

@ -30,21 +30,21 @@ impl ValueSubkeyRangeSet {
Self { data }
}
pub fn interset(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data & other.data)
pub fn intersect(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(&self.data & &other.data)
}
pub fn difference(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data - other.data)
Self::new_with_data(&self.data - &other.data)
}
pub fn union(&self, other: &ValueSubkeyRangeSet) -> ValueSubkeyRangeSet {
Self::new_with_data(self.data | other.data)
Self::new_with_data(&self.data | &other.data)
}
pub fn data(&self) -> RangeSetBlaze<ValueSubkey> {
self.data().clone()
pub fn data(&self) -> &RangeSetBlaze<ValueSubkey> {
&self.data
}
pub fn into_data(self) -> RangeSetBlaze<ValueSubkey> {
self.data()
self.data
}
}

View File

@ -99,7 +99,7 @@ pub struct VeilidStateConfig {
pub struct VeilidValueChange {
#[schemars(with = "String")]
pub key: TypedKey,
pub subkeys: Vec<ValueSubkey>,
pub subkeys: ValueSubkeyRangeSet,
pub count: u32,
pub value: ValueData,
}

View File

@ -61,10 +61,10 @@ packages:
dependency: transitive
description:
name: collection
sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687
sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a
url: "https://pub.dev"
source: hosted
version: "1.17.2"
version: "1.18.0"
convert:
dependency: transitive
description:
@ -220,10 +220,10 @@ packages:
dependency: transitive
description:
name: meta
sha256: "3c74dbf8763d36539f114c799d8a2d87343b5067e9d796ca22b5eb8437090ee3"
sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e
url: "https://pub.dev"
source: hosted
version: "1.9.1"
version: "1.10.0"
path:
dependency: "direct main"
description:
@ -329,18 +329,18 @@ packages:
dependency: transitive
description:
name: stack_trace
sha256: c3c7d8edb15bee7f0f74debd4b9c5f3c2ea86766fe4178eb2a18eb30a0bdaed5
sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b"
url: "https://pub.dev"
source: hosted
version: "1.11.0"
version: "1.11.1"
stream_channel:
dependency: transitive
description:
name: stream_channel
sha256: "83615bee9045c1d322bbbd1ba209b7a749c2cbcdcb3fdd1df8eb488b3279c1c8"
sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7
url: "https://pub.dev"
source: hosted
version: "2.1.1"
version: "2.1.2"
string_scanner:
dependency: transitive
description:
@ -377,10 +377,10 @@ packages:
dependency: transitive
description:
name: test_api
sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8"
sha256: "5c2f730018264d276c20e4f1503fd1308dfbbae39ec8ee63c5236311ac06954b"
url: "https://pub.dev"
source: hosted
version: "0.6.0"
version: "0.6.1"
typed_data:
dependency: transitive
description:
@ -408,10 +408,10 @@ packages:
dependency: transitive
description:
name: web
sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10
sha256: afe077240a270dcfd2aafe77602b4113645af95d0ad31128cc02bce5ac5d5152
url: "https://pub.dev"
source: hosted
version: "0.1.4-beta"
version: "0.3.0"
win32:
dependency: transitive
description:
@ -437,5 +437,5 @@ packages:
source: hosted
version: "3.5.0"
sdks:
dart: ">=3.1.0-185.0.dev <4.0.0"
dart: ">=3.2.0-194.0.dev <4.0.0"
flutter: ">=3.10.6"

View File

@ -61,6 +61,17 @@ impl<E: Send + 'static> TickTask<E> {
self.running.load(core::sync::atomic::Ordering::Acquire)
}
pub fn last_timestamp_us(&self) -> Option<u64> {
let ts = self
.last_timestamp_us
.load(core::sync::atomic::Ordering::Acquire);
if ts == 0 {
None
} else {
Some(ts)
}
}
pub async fn stop(&self) -> Result<(), E> {
// drop the stop source if we have one
let opt_stop_source = &mut *self.stop_source.lock().await;
@ -89,6 +100,17 @@ impl<E: Send + 'static> TickTask<E> {
return Ok(());
}
self.internal_tick(now, last_timestamp_us).await.map(drop)
}
pub async fn try_tick_now(&self) -> Result<bool, E> {
let now = get_timestamp();
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
self.internal_tick(now, last_timestamp_us).await
}
async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result<bool, E> {
// Lock the stop source, tells us if we have ever started this future
let opt_stop_source = &mut *self.stop_source.lock().await;
if opt_stop_source.is_some() {
@ -104,12 +126,12 @@ impl<E: Send + 'static> TickTask<E> {
Ok(None) => {
// No prior result to return which means things are still running
// We can just return now, since the singlefuture will not run a second time
return Ok(());
return Ok(false);
}
Err(()) => {
// If we get this, it's because we are joining the singlefuture already
// Don't bother running but this is not an error in this case
return Ok(());
return Ok(false);
}
};
}
@ -134,7 +156,7 @@ impl<E: Send + 'static> TickTask<E> {
self.last_timestamp_us.store(now, Ordering::Release);
// Save new stopper
*opt_stop_source = Some(stop_source);
Ok(())
Ok(true)
}
// All other conditions should not be reachable
_ => {