mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
propagate set logic to watch logic
This commit is contained in:
parent
4a76353f33
commit
077a1808a5
@ -201,26 +201,36 @@ impl RPCProcessor {
|
|||||||
log_rpc!(debug "{}", debug_string);
|
log_rpc!(debug "{}", debug_string);
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if we have this record ourselves, if so, accept the watch
|
|
||||||
let storage_manager = self.storage_manager();
|
|
||||||
let ret_expiration = network_result_try!(storage_manager
|
|
||||||
.inbound_watch_value(
|
|
||||||
key,
|
|
||||||
subkeys,
|
|
||||||
Timestamp::new(expiration),
|
|
||||||
count,
|
|
||||||
target,
|
|
||||||
opt_watcher
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(RPCError::internal)?);
|
|
||||||
|
|
||||||
// Get the nodes that we know about that are closer to the the key than our own node
|
// Get the nodes that we know about that are closer to the the key than our own node
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closer_to_key_peers = if ret_expiration.as_u64() == 0 {
|
let closer_to_key_peers = network_result_try!(
|
||||||
network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]))
|
routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])
|
||||||
|
);
|
||||||
|
|
||||||
|
// See if we would have accepted this as a set
|
||||||
|
let set_value_count = {
|
||||||
|
let c = self.config.get();
|
||||||
|
c.network.dht.set_value_count as usize
|
||||||
|
};
|
||||||
|
let ret_expiration = if closer_to_key_peers.len() >= set_value_count {
|
||||||
|
// Not close enough
|
||||||
|
Timestamp::default()
|
||||||
} else {
|
} else {
|
||||||
vec![]
|
// Close enough, lets watch it
|
||||||
|
|
||||||
|
// See if we have this record ourselves, if so, accept the watch
|
||||||
|
let storage_manager = self.storage_manager();
|
||||||
|
network_result_try!(storage_manager
|
||||||
|
.inbound_watch_value(
|
||||||
|
key,
|
||||||
|
subkeys,
|
||||||
|
Timestamp::new(expiration),
|
||||||
|
count,
|
||||||
|
target,
|
||||||
|
opt_watcher
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(RPCError::internal)?)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "debug-dht")]
|
#[cfg(feature = "debug-dht")]
|
||||||
@ -238,8 +248,14 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Make WatchValue answer
|
// Make WatchValue answer
|
||||||
let watch_value_a =
|
let watch_value_a = RPCOperationWatchValueA::new(
|
||||||
RPCOperationWatchValueA::new(ret_expiration.as_u64(), closer_to_key_peers)?;
|
ret_expiration.as_u64(),
|
||||||
|
if ret_expiration.as_u64() == 0 {
|
||||||
|
closer_to_key_peers
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
// Send GetValue answer
|
// Send GetValue answer
|
||||||
self.answer(
|
self.answer(
|
||||||
|
Loading…
Reference in New Issue
Block a user