mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-08-03 20:24:16 -04:00
startup lock
This commit is contained in:
parent
20e5df6564
commit
1b34239eb8
23 changed files with 415 additions and 5 deletions
|
@ -191,7 +191,6 @@ async_executors = { version = "0.7.0", default-features = false, features = [
|
|||
"bindgen",
|
||||
"timer",
|
||||
] }
|
||||
async-lock = "2.8.0"
|
||||
wasm-bindgen = "0.2.92"
|
||||
js-sys = "0.3.69"
|
||||
wasm-bindgen-futures = "0.4.42"
|
||||
|
|
|
@ -1132,7 +1132,13 @@ impl NetworkManager {
|
|||
source_noderef.merge_filter(NodeRefFilter::new().with_routing_domain(routing_domain));
|
||||
|
||||
// Pass message to RPC system
|
||||
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?;
|
||||
if let Err(e) =
|
||||
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)
|
||||
{
|
||||
// Couldn't enqueue, but not the sender's fault
|
||||
log_net!(debug "failed to enqueue direct message: {}", e);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Inform caller that we dealt with the envelope locally
|
||||
Ok(true)
|
||||
|
|
|
@ -292,6 +292,7 @@ struct RPCProcessorUnlockedInner {
|
|||
update_callback: UpdateCallback,
|
||||
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>,
|
||||
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
|
||||
startup_lock: StartupLock,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -345,6 +346,7 @@ impl RPCProcessor {
|
|||
update_callback,
|
||||
waiting_rpc_table: OperationWaiter::new(),
|
||||
waiting_app_call_table: OperationWaiter::new(),
|
||||
startup_lock: StartupLock::new(),
|
||||
}
|
||||
}
|
||||
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
|
||||
|
@ -377,6 +379,7 @@ impl RPCProcessor {
|
|||
#[instrument(level = "debug", skip_all, err)]
|
||||
pub async fn startup(&self) -> EyreResult<()> {
|
||||
log_rpc!(debug "startup rpc processor");
|
||||
let guard = self.unlocked_inner.startup_lock.startup()?;
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
|
@ -405,13 +408,18 @@ impl RPCProcessor {
|
|||
self.storage_manager
|
||||
.set_rpc_processor(Some(self.clone()))
|
||||
.await;
|
||||
|
||||
|
||||
guard.success();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn shutdown(&self) {
|
||||
log_rpc!(debug "starting rpc processor shutdown");
|
||||
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
|
||||
log_rpc!(debug "rpc processor already shut down");
|
||||
return;
|
||||
};
|
||||
|
||||
// Stop storage manager from using us
|
||||
self.storage_manager.set_rpc_processor(None).await;
|
||||
|
@ -437,6 +445,7 @@ impl RPCProcessor {
|
|||
// Release the rpc processor
|
||||
*self.inner.lock() = Self::new_inner();
|
||||
|
||||
guard.success();
|
||||
log_rpc!(debug "finished rpc processor shutdown");
|
||||
}
|
||||
|
||||
|
@ -536,9 +545,11 @@ impl RPCProcessor {
|
|||
&self,
|
||||
node_id: TypedKey,
|
||||
safety_selection: SafetySelection,
|
||||
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
|
||||
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
|
||||
let this = self.clone();
|
||||
Box::pin(async move {
|
||||
let _guard = this.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let routing_table = this.routing_table();
|
||||
|
||||
// First see if we have the node in our routing table already
|
||||
|
@ -1699,6 +1710,8 @@ impl RPCProcessor {
|
|||
routing_domain: RoutingDomain,
|
||||
body: Vec<u8>,
|
||||
) -> EyreResult<()> {
|
||||
let _guard = self.unlocked_inner.startup_lock.enter().map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let header = RPCMessageHeader {
|
||||
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
|
||||
envelope,
|
||||
|
|
|
@ -9,6 +9,12 @@ impl RPCProcessor {
|
|||
dest: Destination,
|
||||
message: Vec<u8>,
|
||||
) -> RPCNetworkResult<Answer<Vec<u8>>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let debug_string = format!("AppCall(message(len)={}) => {}", message.len(), dest);
|
||||
|
||||
let app_call_q = RPCOperationAppCallQ::new(message)?;
|
||||
|
@ -152,6 +158,11 @@ impl RPCProcessor {
|
|||
call_id: OperationId,
|
||||
message: Vec<u8>,
|
||||
) -> Result<(), RPCError> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
self.unlocked_inner
|
||||
.waiting_app_call_table
|
||||
.complete_op_waiter(call_id, message)
|
||||
|
|
|
@ -9,6 +9,12 @@ impl RPCProcessor {
|
|||
dest: Destination,
|
||||
message: Vec<u8>,
|
||||
) -> RPCNetworkResult<()> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let app_message = RPCOperationAppMessage::new(message)?;
|
||||
let statement = RPCStatement::new(RPCStatementDetail::AppMessage(Box::new(app_message)));
|
||||
|
||||
|
|
|
@ -14,6 +14,12 @@ impl RPCProcessor {
|
|||
node_id: TypedKey,
|
||||
capabilities: Vec<Capability>,
|
||||
) -> RPCNetworkResult<Answer<Vec<PeerInfo>>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
if matches!(
|
||||
dest,
|
||||
|
|
|
@ -30,6 +30,12 @@ impl RPCProcessor {
|
|||
subkey: ValueSubkey,
|
||||
last_descriptor: Option<SignedValueDescriptor>,
|
||||
) ->RPCNetworkResult<Answer<GetValueAnswer>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
// and get the target noderef so we can validate the response
|
||||
let Some(target) = dest.node() else {
|
||||
|
|
|
@ -32,6 +32,12 @@ impl RPCProcessor {
|
|||
subkeys: ValueSubkeyRangeSet,
|
||||
last_descriptor: Option<SignedValueDescriptor>,
|
||||
) -> RPCNetworkResult<Answer<InspectValueAnswer>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
// and get the target noderef so we can validate the response
|
||||
let Some(target) = dest.node() else {
|
||||
|
|
|
@ -9,6 +9,12 @@ impl RPCProcessor {
|
|||
dest: Destination,
|
||||
receipt: D,
|
||||
) -> RPCNetworkResult<()> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let receipt = receipt.as_ref().to_vec();
|
||||
|
||||
let return_receipt = RPCOperationReturnReceipt::new(receipt)?;
|
||||
|
|
|
@ -34,6 +34,12 @@ impl RPCProcessor {
|
|||
descriptor: SignedValueDescriptor,
|
||||
send_descriptor: bool,
|
||||
) ->RPCNetworkResult<Answer<SetValueAnswer>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
// and get the target noderef so we can validate the response
|
||||
let Some(target) = dest.node() else {
|
||||
|
|
|
@ -9,6 +9,12 @@ impl RPCProcessor {
|
|||
dest: Destination,
|
||||
signal_info: SignalInfo,
|
||||
) -> RPCNetworkResult<()> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
if matches!(
|
||||
dest,
|
||||
|
|
|
@ -20,6 +20,12 @@ impl RPCProcessor {
|
|||
self,
|
||||
dest: Destination,
|
||||
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Determine routing domain and node status to send
|
||||
let (opt_target_nr, routing_domain, node_status) = if let Some(UnsafeRoutingInfo {
|
||||
opt_node,
|
||||
|
|
|
@ -10,6 +10,12 @@ impl RPCProcessor {
|
|||
dial_info: DialInfo,
|
||||
redirect: bool,
|
||||
) -> Result<bool, RPCError> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
let network_manager = self.network_manager();
|
||||
let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms);
|
||||
|
||||
|
|
|
@ -13,6 +13,12 @@ impl RPCProcessor {
|
|||
watch_id: u64,
|
||||
value: Option<SignedValueData>,
|
||||
) -> RPCNetworkResult<()> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination is never using a safety route
|
||||
if matches!(dest.get_safety_selection(), SafetySelection::Safe(_)) {
|
||||
return Err(RPCError::internal(
|
||||
|
|
|
@ -32,6 +32,12 @@ impl RPCProcessor {
|
|||
watcher: KeyPair,
|
||||
watch_id: Option<u64>,
|
||||
) -> RPCNetworkResult<Answer<WatchValueAnswer>> {
|
||||
let _guard = self
|
||||
.unlocked_inner
|
||||
.startup_lock
|
||||
.enter()
|
||||
.map_err(RPCError::map_try_again("not started up"))?;
|
||||
|
||||
// Ensure destination never has a private route
|
||||
// and get the target noderef so we can validate the response
|
||||
let Some(target) = dest.node() else {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue