mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-03-14 01:46:41 -04:00
Make SafetySelection and SafetySpec non-Copy
This commit is contained in:
parent
1387c512ce
commit
96e85636e7
@ -282,7 +282,7 @@ impl RPCProcessor {
|
||||
match target {
|
||||
Target::NodeId(node_id) => {
|
||||
// Resolve node
|
||||
let nr = match self.resolve_node(node_id, safety_selection).await? {
|
||||
let nr = match self.resolve_node(node_id, safety_selection.clone()).await? {
|
||||
Some(nr) => nr,
|
||||
None => {
|
||||
return Err(RPCError::network("could not resolve node id"));
|
||||
@ -525,7 +525,7 @@ impl RPCProcessor {
|
||||
// If this was received over our private route, it's okay to respond to a private route via our safety route
|
||||
NetworkResult::value(Destination::private_route(
|
||||
pr.clone(),
|
||||
SafetySelection::Safe(detail.safety_spec),
|
||||
SafetySelection::Safe(detail.safety_spec.clone()),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -366,6 +366,7 @@ impl RPCProcessor {
|
||||
// Routine to call to generate fanout
|
||||
let call_routine = |next_node: NodeRef| {
|
||||
let this = self.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
async move {
|
||||
let v = network_result_try!(
|
||||
this.clone()
|
||||
@ -592,7 +593,7 @@ impl RPCProcessor {
|
||||
|
||||
// Compile the safety route with the private route
|
||||
let compiled_route: CompiledRoute = network_result_try!(rss
|
||||
.compile_safety_route(safety_selection, remote_private_route)
|
||||
.compile_safety_route(safety_selection.clone(), remote_private_route)
|
||||
.to_rpc_network_result()?);
|
||||
let sr_is_stub = compiled_route.safety_route.is_stub();
|
||||
let sr_pubkey = compiled_route.safety_route.public_key.value;
|
||||
@ -680,12 +681,12 @@ impl RPCProcessor {
|
||||
match dest {
|
||||
Destination::Direct {
|
||||
node: ref node_ref,
|
||||
safety_selection,
|
||||
ref safety_selection,
|
||||
}
|
||||
| Destination::Relay {
|
||||
relay: ref node_ref,
|
||||
node: _,
|
||||
safety_selection,
|
||||
ref safety_selection,
|
||||
} => {
|
||||
// Send to a node without a private route
|
||||
// --------------------------------------
|
||||
@ -694,7 +695,7 @@ impl RPCProcessor {
|
||||
let (node_ref, destination_node_ref) = if let Destination::Relay {
|
||||
relay: _,
|
||||
node: ref target,
|
||||
safety_selection: _,
|
||||
safety_selection: ref _safety_selection,
|
||||
} = dest
|
||||
{
|
||||
(node_ref.clone(), target.clone())
|
||||
@ -707,8 +708,8 @@ impl RPCProcessor {
|
||||
SafetySelection::Unsafe(sequencing) => {
|
||||
// Apply safety selection sequencing requirement if it is more strict than the node_ref's sequencing requirement
|
||||
let mut node_ref = node_ref.clone();
|
||||
if sequencing > node_ref.sequencing() {
|
||||
node_ref.set_sequencing(sequencing)
|
||||
if *sequencing > node_ref.sequencing() {
|
||||
node_ref.set_sequencing(sequencing.clone())
|
||||
}
|
||||
|
||||
// Reply private route should be None here, even for questions
|
||||
@ -749,7 +750,7 @@ impl RPCProcessor {
|
||||
// Wrap with safety route
|
||||
out = self.wrap_with_route(
|
||||
routing_domain,
|
||||
safety_selection,
|
||||
safety_selection.clone(),
|
||||
private_route,
|
||||
reply_private_route,
|
||||
message,
|
||||
|
@ -90,6 +90,7 @@ impl StorageManager {
|
||||
let context = context.clone();
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let last_descriptor = last_get_result.opt_descriptor.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
async move {
|
||||
let gva = network_result_try!(
|
||||
rpc_processor
|
||||
|
@ -125,6 +125,7 @@ impl StorageManager {
|
||||
let context = context.clone();
|
||||
let opt_descriptor = local_inspect_result.opt_descriptor.clone();
|
||||
let subkeys = subkeys.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
async move {
|
||||
let iva = network_result_try!(
|
||||
rpc_processor
|
||||
|
@ -242,7 +242,7 @@ impl StorageManager {
|
||||
|
||||
// Create a new owned local record from scratch
|
||||
let (key, owner) = inner
|
||||
.create_new_owned_local_record(kind, schema, safety_selection)
|
||||
.create_new_owned_local_record(kind, schema, safety_selection.clone())
|
||||
.await?;
|
||||
|
||||
// Now that the record is made we should always succeed to open the existing record
|
||||
@ -265,7 +265,7 @@ impl StorageManager {
|
||||
|
||||
// See if we have a local record already or not
|
||||
if let Some(res) = inner
|
||||
.open_existing_record(key, writer, safety_selection)
|
||||
.open_existing_record(key, writer, safety_selection.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(res);
|
||||
@ -289,7 +289,7 @@ impl StorageManager {
|
||||
rpc_processor,
|
||||
key,
|
||||
subkey,
|
||||
safety_selection,
|
||||
safety_selection.clone(),
|
||||
GetResult::default(),
|
||||
)
|
||||
.await?;
|
||||
@ -318,7 +318,7 @@ impl StorageManager {
|
||||
// via some parallel process
|
||||
|
||||
if let Some(res) = inner
|
||||
.open_existing_record(key, writer, safety_selection)
|
||||
.open_existing_record(key, writer, safety_selection.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(res);
|
||||
@ -580,7 +580,7 @@ impl StorageManager {
|
||||
let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
|
||||
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
|
||||
// Add to offline writes to flush
|
||||
inner.add_offline_subkey_write(key, subkey, safety_selection);
|
||||
inner.add_offline_subkey_write(key, subkey, safety_selection.clone());
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@ -595,7 +595,7 @@ impl StorageManager {
|
||||
rpc_processor,
|
||||
key,
|
||||
subkey,
|
||||
safety_selection,
|
||||
safety_selection.clone(),
|
||||
signed_value_data.clone(),
|
||||
descriptor,
|
||||
)
|
||||
@ -605,7 +605,7 @@ impl StorageManager {
|
||||
Err(e) => {
|
||||
// Failed to write, try again later
|
||||
let mut inner = self.lock().await?;
|
||||
inner.add_offline_subkey_write(key, subkey, safety_selection);
|
||||
inner.add_offline_subkey_write(key, subkey, safety_selection.clone());
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
@ -623,7 +623,7 @@ impl StorageManager {
|
||||
key,
|
||||
subkey,
|
||||
signed_value_data.value_data().clone(),
|
||||
safety_selection,
|
||||
safety_selection.clone(),
|
||||
result,
|
||||
)
|
||||
.await?;
|
||||
|
@ -49,7 +49,7 @@ impl OpenedRecord {
|
||||
}
|
||||
|
||||
pub fn safety_selection(&self) -> SafetySelection {
|
||||
self.safety_selection
|
||||
self.safety_selection.clone()
|
||||
}
|
||||
pub fn set_safety_selection(&mut self, safety_selection: SafetySelection) {
|
||||
self.safety_selection = safety_selection;
|
||||
|
@ -87,6 +87,7 @@ impl StorageManager {
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let context = context.clone();
|
||||
let descriptor = descriptor.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
async move {
|
||||
let send_descriptor = true; // xxx check if next_node needs the descriptor or not, see issue #203
|
||||
|
||||
@ -309,6 +310,7 @@ impl StorageManager {
|
||||
move |result: VeilidAPIResult<set_value::OutboundSetValueResult>| -> SendPinBoxFuture<bool> {
|
||||
let this = this.clone();
|
||||
let last_value_data = last_value_data.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
Box::pin(async move {
|
||||
let result = match result {
|
||||
Ok(v) => v,
|
||||
|
@ -349,7 +349,7 @@ impl StorageManagerInner {
|
||||
// Process local record
|
||||
|
||||
// Keep the safety selection we opened the record with
|
||||
r.detail_mut().safety_selection = safety_selection;
|
||||
r.detail_mut().safety_selection = safety_selection.clone();
|
||||
|
||||
// Return record details
|
||||
(*r.owner(), r.schema())
|
||||
@ -360,7 +360,7 @@ impl StorageManagerInner {
|
||||
// If we don't have a local record yet, check to see if we have a remote record
|
||||
// if so, migrate it to a local record
|
||||
let Some(v) = self
|
||||
.move_remote_record_to_local(key, safety_selection)
|
||||
.move_remote_record_to_local(key, safety_selection.clone())
|
||||
.await?
|
||||
else {
|
||||
// No remote record either
|
||||
@ -388,7 +388,7 @@ impl StorageManagerInner {
|
||||
.entry(key)
|
||||
.and_modify(|e| {
|
||||
e.set_writer(writer);
|
||||
e.set_safety_selection(safety_selection);
|
||||
e.set_safety_selection(safety_selection.clone());
|
||||
})
|
||||
.or_insert_with(|| OpenedRecord::new(writer, safety_selection));
|
||||
|
||||
@ -441,7 +441,7 @@ impl StorageManagerInner {
|
||||
let record = Record::<LocalRecordDetail>::new(
|
||||
Timestamp::now(),
|
||||
signed_value_descriptor,
|
||||
LocalRecordDetail::new(safety_selection),
|
||||
LocalRecordDetail::new(safety_selection.clone()),
|
||||
)?;
|
||||
local_record_store.new_record(key, record).await?;
|
||||
|
||||
|
@ -130,7 +130,7 @@ impl StorageManager {
|
||||
stop_token.clone(),
|
||||
work_item.key,
|
||||
subkey,
|
||||
work_item.safety_selection,
|
||||
work_item.safety_selection.clone(),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
|
@ -195,7 +195,7 @@ impl StorageManager {
|
||||
subkeys.clone(),
|
||||
expiration,
|
||||
count,
|
||||
safety_selection,
|
||||
safety_selection.clone(),
|
||||
opt_watcher,
|
||||
watch_id,
|
||||
watch_node,
|
||||
@ -257,6 +257,7 @@ impl StorageManager {
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let context = context.clone();
|
||||
let subkeys = subkeys.clone();
|
||||
let safety_selection = safety_selection.clone();
|
||||
|
||||
async move {
|
||||
let wva = network_result_try!(
|
||||
|
@ -2237,7 +2237,7 @@ TableDB Operations:
|
||||
// Relay
|
||||
let relay_nr = resolve_filtered_node_ref(
|
||||
routing_table.clone(),
|
||||
ss.unwrap_or_default(),
|
||||
ss.clone().unwrap_or_default(),
|
||||
)(second)
|
||||
.await?;
|
||||
let target_nr = get_node_ref(routing_table)(first)?;
|
||||
@ -2250,9 +2250,11 @@ TableDB Operations:
|
||||
Some(d)
|
||||
} else {
|
||||
// Direct
|
||||
let target_nr =
|
||||
resolve_filtered_node_ref(routing_table, ss.unwrap_or_default())(text)
|
||||
.await?;
|
||||
let target_nr = resolve_filtered_node_ref(
|
||||
routing_table,
|
||||
ss.clone().unwrap_or_default(),
|
||||
)(text)
|
||||
.await?;
|
||||
|
||||
let mut d = Destination::direct(target_nr);
|
||||
if let Some(ss) = ss {
|
||||
|
@ -107,7 +107,7 @@ impl RoutingContext {
|
||||
Self {
|
||||
api: self.api.clone(),
|
||||
unlocked_inner: Arc::new(RoutingContextUnlockedInner {
|
||||
safety_selection: match self.unlocked_inner.safety_selection {
|
||||
safety_selection: match &self.unlocked_inner.safety_selection {
|
||||
SafetySelection::Unsafe(_) => SafetySelection::Unsafe(sequencing),
|
||||
SafetySelection::Safe(safety_spec) => SafetySelection::Safe(SafetySpec {
|
||||
preferred_route: safety_spec.preferred_route,
|
||||
@ -122,13 +122,13 @@ impl RoutingContext {
|
||||
|
||||
/// Get the safety selection in use on this routing context.
|
||||
pub fn safety(&self) -> SafetySelection {
|
||||
self.unlocked_inner.safety_selection
|
||||
self.unlocked_inner.safety_selection.clone()
|
||||
}
|
||||
|
||||
/// Get the sequencing used by this routing context
|
||||
pub fn sequencing(&self) -> Sequencing {
|
||||
match self.unlocked_inner.safety_selection {
|
||||
SafetySelection::Unsafe(sequencing) => sequencing,
|
||||
match &self.unlocked_inner.safety_selection {
|
||||
SafetySelection::Unsafe(sequencing) => *sequencing,
|
||||
SafetySelection::Safe(safety_spec) => safety_spec.sequencing,
|
||||
}
|
||||
}
|
||||
@ -145,7 +145,7 @@ impl RoutingContext {
|
||||
|
||||
let rpc_processor = self.api.rpc_processor()?;
|
||||
rpc_processor
|
||||
.resolve_target_to_destination(target, self.unlocked_inner.safety_selection)
|
||||
.resolve_target_to_destination(target, self.unlocked_inner.safety_selection.clone())
|
||||
.await
|
||||
.map_err(VeilidAPIError::invalid_target)
|
||||
}
|
||||
@ -245,7 +245,7 @@ impl RoutingContext {
|
||||
Crypto::validate_crypto_kind(kind)?;
|
||||
let storage_manager = self.api.storage_manager()?;
|
||||
storage_manager
|
||||
.create_record(kind, schema, self.unlocked_inner.safety_selection)
|
||||
.create_record(kind, schema, self.unlocked_inner.safety_selection.clone())
|
||||
.await
|
||||
}
|
||||
|
||||
@ -272,7 +272,11 @@ impl RoutingContext {
|
||||
Crypto::validate_crypto_kind(key.kind)?;
|
||||
let storage_manager = self.api.storage_manager()?;
|
||||
storage_manager
|
||||
.open_record(key, default_writer, self.unlocked_inner.safety_selection)
|
||||
.open_record(
|
||||
key,
|
||||
default_writer,
|
||||
self.unlocked_inner.safety_selection.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ pub async fn test_stability() {
|
||||
|
||||
pub async fn test_safetyselection() {
|
||||
let orig = SafetySelection::Unsafe(Sequencing::EnsureOrdered);
|
||||
let copy = deserialize_json(&serialize_json(orig)).unwrap();
|
||||
let copy = deserialize_json(&serialize_json(orig.clone())).unwrap();
|
||||
|
||||
assert_eq!(orig, copy);
|
||||
}
|
||||
@ -74,7 +74,7 @@ pub async fn test_safetyspec() {
|
||||
stability: Stability::default(),
|
||||
sequencing: Sequencing::default(),
|
||||
};
|
||||
let copy = deserialize_json(&serialize_json(orig)).unwrap();
|
||||
let copy = deserialize_json(&serialize_json(orig.clone())).unwrap();
|
||||
|
||||
assert_eq!(orig, copy);
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ impl Default for Stability {
|
||||
|
||||
/// The choice of safety route to include in compiled routes.
|
||||
#[derive(
|
||||
Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
|
||||
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
|
||||
)]
|
||||
#[cfg_attr(
|
||||
target_arch = "wasm32",
|
||||
@ -77,7 +77,7 @@ impl Default for SafetySelection {
|
||||
|
||||
/// Options for safety routes (sender privacy).
|
||||
#[derive(
|
||||
Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
|
||||
Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
|
||||
)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct SafetySpec {
|
||||
|
Loading…
x
Reference in New Issue
Block a user