diff --git a/doc/config/sample.config b/doc/config/sample.config index 5d930075..2727eaf0 100644 --- a/doc/config/sample.config +++ b/doc/config/sample.config @@ -27,6 +27,7 @@ logging: enabled: false testing: subnode_index: 0 + subnode_count: 1 core: protected_store: allow_insecure_fallback: true diff --git a/doc/config/veilid-server-config.md b/doc/config/veilid-server-config.md index 9489a6bb..d817445b 100644 --- a/doc/config/veilid-server-config.md +++ b/doc/config/veilid-server-config.md @@ -138,6 +138,7 @@ otlp: ```yaml testing: subnode_index: 0 + subnode_count: 1 ``` ### core diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 60b26d0a..bf563403 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -289,7 +289,10 @@ impl Network { if !from.ip().is_unspecified() { vec![from] } else { - let addrs = self.last_network_state().stable_interface_addresses; + let addrs = self + .last_network_state() + .unwrap() + .stable_interface_addresses; addrs .iter() .filter_map(|a| { diff --git a/veilid-core/src/network_manager/native/network_state.rs b/veilid-core/src/network_manager/native/network_state.rs index 7670c4ed..5ac8f551 100644 --- a/veilid-core/src/network_manager/native/network_state.rs +++ b/veilid-core/src/network_manager/native/network_state.rs @@ -41,8 +41,8 @@ impl Network { addrs } - pub(super) fn last_network_state(&self) -> NetworkState { - self.inner.lock().network_state.clone().unwrap() + pub(super) fn last_network_state(&self) -> Option { + self.inner.lock().network_state.clone() } pub(super) fn is_stable_interface_address(&self, addr: IpAddr) -> bool { diff --git a/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs b/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs index 69199683..b5d05f8d 100644 --- a/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs +++ b/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs @@ -29,7 +29,7 @@ impl Network { } }; - if new_network_state != last_network_state { + if last_network_state.is_none() || new_network_state != last_network_state.unwrap() { // Save new network state { let mut inner = self.inner.lock(); diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index d8097a9a..5009b3f4 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -39,7 +39,7 @@ struct RequestLine { // Request to process line: String, // Where to send the response - responses_tx: flume::Sender, + responses_tx: flume::Sender>, } struct ClientApiInner { @@ -48,7 +48,7 @@ struct ClientApiInner { settings: Settings, stop: Option, join_handle: Option, - update_channels: HashMap>, + update_channels: HashMap>>, } #[derive(Clone)] @@ -305,7 +305,8 @@ impl ClientApi { debug!("JSONAPI: Response: {:?}", response); // Marshal json + newline => NDJSON - let response_string = serialize_json(json_api::RecvMessage::Response(response)) + "\n"; + let response_string = + Arc::new(serialize_json(json_api::RecvMessage::Response(response)) + "\n"); if let Err(e) = responses_tx.send_async(response_string).await { eprintln!("response not sent: {}", e) } @@ -322,7 +323,7 @@ impl ClientApi { self, mut reader: R, requests_tx: flume::Sender>, - responses_tx: flume::Sender, + responses_tx: flume::Sender>, ) -> VeilidAPIResult> { let mut linebuf = String::new(); while let Ok(size) = reader.read_line(&mut linebuf).await { @@ -356,7 +357,7 @@ impl ClientApi { async fn send_responses( self, - responses_rx: flume::Receiver, + responses_rx: flume::Receiver>, mut writer: W, ) -> VeilidAPIResult> { while let Ok(resp) = responses_rx.recv_async().await { @@ -521,11 +522,16 @@ impl ClientApi { } pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) { - // serialize update to NDJSON - let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n"; - - // Pass other updates to clients let inner = self.inner.lock(); + if inner.update_channels.is_empty() { + return; + } + + // serialize update to NDJSON + let veilid_update = + Arc::new(serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n"); + + // Pass updates to clients for ch in inner.update_channels.values() { if ch.send(veilid_update.clone()).is_err() { // eprintln!("failed to send update: {}", e); diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 0df2464a..e3a5d698 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -99,7 +99,7 @@ pub struct CmdlineArgs { /// Run several nodes in parallel on the same machine for testing purposes /// - /// Will run subnodes N though N+(subnode_count-1), where N is 0 or set via --subnode_index + /// Will run subnodes N through N+(subnode_count-1), where N is 0 or set via --subnode_index #[arg(long, value_name = "COUNT")] subnode_count: Option, @@ -214,10 +214,6 @@ fn main() -> EyreResult<()> { if args.foreground { settingsrw.daemon.enabled = false; } - if let Some(subnode_index) = args.subnode_index { - settingsrw.testing.subnode_index = subnode_index; - }; - if args.logging.debug { settingsrw.logging.terminal.enabled = true; settingsrw.logging.terminal.level = LogLevel::Debug; @@ -226,23 +222,33 @@ fn main() -> EyreResult<()> { settingsrw.logging.terminal.enabled = true; settingsrw.logging.terminal.level = LogLevel::Trace; } + + if let Some(subnode_index) = args.subnode_index { + settingsrw.testing.subnode_index = subnode_index; + }; + if let Some(subnode_count) = args.subnode_count { + if subnode_count == 0 { + bail!("subnode count must be positive"); + } + settingsrw.testing.subnode_count = subnode_count; + }; + #[cfg(feature = "opentelemetry-otlp")] - if args.otlp.is_some() { + if let Some(otlp) = args.otlp { println!("Enabling OTLP tracing"); settingsrw.logging.otlp.enabled = true; - settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str( - args.otlp - .expect("should not be null because of default missing value") - .as_str(), - ) - .wrap_err("failed to parse OTLP address")?; + settingsrw.logging.otlp.grpc_endpoint = + NamedSocketAddrs::from_str(&otlp).wrap_err("failed to parse OTLP address")?; settingsrw.logging.otlp.level = LogLevel::Trace; } if let Some(flame) = args.flame { let flame = if flame.is_empty() { - Settings::get_default_flame_path(settingsrw.testing.subnode_index) - .to_string_lossy() - .to_string() + Settings::get_default_flame_path( + settingsrw.testing.subnode_index, + settingsrw.testing.subnode_count, + ) + .to_string_lossy() + .to_string() } else { flame.to_string_lossy().to_string() }; @@ -253,9 +259,12 @@ fn main() -> EyreResult<()> { #[cfg(unix)] if let Some(perfetto) = args.perfetto { let perfetto = if perfetto.is_empty() { - Settings::get_default_perfetto_path(settingsrw.testing.subnode_index) - .to_string_lossy() - .to_string() + Settings::get_default_perfetto_path( + settingsrw.testing.subnode_index, + settingsrw.testing.subnode_count, + ) + .to_string_lossy() + .to_string() } else { perfetto.to_string_lossy().to_string() }; @@ -297,6 +306,10 @@ fn main() -> EyreResult<()> { } let mut node_id_set = false; if let Some(key_set) = args.set_node_id { + if settingsrw.testing.subnode_count != 1 { + bail!("subnode count must be 1 if setting node id/secret"); + } + node_id_set = true; // Turn off terminal logging so we can be interactive settingsrw.logging.terminal.enabled = false; @@ -360,11 +373,6 @@ fn main() -> EyreResult<()> { } } - // Apply subnode index if we're testing - settings - .apply_subnode_index() - .wrap_err("failed to apply subnode index")?; - // --- Verify Config --- settings.verify()?; diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 20820737..9182b557 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -32,8 +32,8 @@ pub fn shutdown() { } } -//#[instrument(err, skip_all)] -pub async fn run_veilid_server( +pub async fn run_veilid_server_subnode( + subnode: u16, settings: Settings, server_mode: ServerMode, veilid_logs: VeilidLogs, @@ -44,17 +44,34 @@ pub async fn run_veilid_server( settings_client_api_network_enabled, settings_client_api_ipc_directory, settings_client_api_listen_address_addrs, - subnode_index, + subnode_offset, ) = { let settingsr = settings.read(); + cfg_if! { + if #[cfg(feature = "virtual-network")] { + let subnode_offset = if inner.core.network.virtual_network.enabled { + // Don't offset ports when using virtual networking + 0 + } else { + subnode + }; + } else { + let subnode_offset = subnode; + } + } + ( settingsr.auto_attach, settingsr.client_api.ipc_enabled, settingsr.client_api.network_enabled, settingsr.client_api.ipc_directory.clone(), - settingsr.client_api.listen_address.addrs.clone(), - settingsr.testing.subnode_index, + settingsr + .client_api + .listen_address + .with_offset_port(subnode_offset)? + .addrs, + subnode_offset, ) }; @@ -72,7 +89,7 @@ pub async fn run_veilid_server( eprintln!("error sending veilid update callback: {:?}", change); } }); - let config_callback = settings.get_core_config_callback(); + let config_callback = settings.get_core_config_callback(subnode, subnode_offset); // Start Veilid Core and get API let veilid_api = veilid_core::api_startup(update_callback, config_callback) @@ -86,7 +103,7 @@ pub async fn run_veilid_server( client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone()); some_capi.clone().run( if settings_client_api_ipc_enabled { - Some(settings_client_api_ipc_directory.join(subnode_index.to_string())) + Some(settings_client_api_ipc_directory.join(subnode.to_string())) } else { None }, @@ -108,7 +125,7 @@ pub async fn run_veilid_server( let capi2 = capi.clone(); let update_receiver_shutdown = SingleShotEventual::new(Some(())); let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse(); - let update_receiver_jh = spawn_local( + let update_receiver_jh = spawn( "update_receiver", async move { loop { @@ -116,7 +133,7 @@ pub async fn run_veilid_server( res = receiver.recv_async() => { if let Ok(change) = res { if let Some(capi) = &capi2 { - // Handle state changes on main thread for capnproto rpc + // Handle state changes for JSON API capi.clone().handle_update(change); } } else { @@ -201,9 +218,48 @@ pub async fn run_veilid_server( // Wait for update receiver to exit let _ = update_receiver_jh.await; + out +} + +//#[instrument(err, skip_all)] +pub async fn run_veilid_server( + settings: Settings, + server_mode: ServerMode, + veilid_logs: VeilidLogs, +) -> EyreResult<()> { + let (subnode_index, subnode_count) = { + let settingsr = settings.read(); + ( + settingsr.testing.subnode_index, + settingsr.testing.subnode_count, + ) + }; + + // Ensure we only try to spawn multiple subnodes in 'normal' execution mode + if !matches!(server_mode, ServerMode::Normal) && subnode_count != 1 { + bail!("can only have multiple subnodes in 'normal' execution mode"); + } + + // Run all subnodes + let mut all_subnodes_jh = vec![]; + for subnode in subnode_index..(subnode_index + subnode_count) { + debug!("Spawning subnode {}", subnode); + let jh = spawn( + &format!("subnode{}", subnode), + run_veilid_server_subnode(subnode, settings.clone(), server_mode, veilid_logs.clone()), + ); + all_subnodes_jh.push(jh); + } + + // Wait for all subnodes to complete + for (sn, jh) in all_subnodes_jh.into_iter().enumerate() { + jh.await?; + debug!("Subnode {} exited", sn); + } + // Finally, drop logs // this is explicit to ensure we don't accidentally drop them too soon via a move drop(veilid_logs); - out + Ok(()) } diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 1c9312a9..3c494ac5 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -377,6 +377,11 @@ impl ParsedUrl { self.urlstring = self.url.to_string(); Ok(()) } + pub fn with_offset_port(&self, offset: u16) -> EyreResult { + let mut x = self.clone(); + x.offset_port(offset)?; + Ok(x) + } } impl FromStr for ParsedUrl { @@ -481,6 +486,12 @@ impl NamedSocketAddrs { Ok(true) } + + pub fn with_offset_port(&self, offset: u16) -> EyreResult { + let mut x = self.clone(); + x.offset_port(offset)?; + Ok(x) + } } #[derive(Debug, Deserialize, Serialize)] @@ -863,75 +874,6 @@ impl Settings { self.inner.write() } - pub fn apply_subnode_index(&self) -> EyreResult<()> { - let mut settingsrw = self.write(); - let idx = settingsrw.testing.subnode_index; - if idx == 0 { - return Ok(()); - } - - // bump client api port - settingsrw.client_api.listen_address.offset_port(idx)?; - - // bump protocol ports - settingsrw - .core - .network - .protocol - .udp - .listen_address - .offset_port(idx)?; - settingsrw - .core - .network - .protocol - .tcp - .listen_address - .offset_port(idx)?; - settingsrw - .core - .network - .protocol - .ws - .listen_address - .offset_port(idx)?; - if let Some(url) = &mut settingsrw.core.network.protocol.ws.url { - url.offset_port(idx)?; - } - settingsrw - .core - .network - .protocol - .wss - .listen_address - .offset_port(idx)?; - if let Some(url) = &mut settingsrw.core.network.protocol.wss.url { - url.offset_port(idx)?; - } - // bump application ports - settingsrw - .core - .network - .application - .http - .listen_address - .offset_port(idx)?; - if let Some(url) = &mut settingsrw.core.network.application.http.url { - url.offset_port(idx)?; - } - settingsrw - .core - .network - .application - .https - .listen_address - .offset_port(idx)?; - if let Some(url) = &mut settingsrw.core.network.application.https.url { - url.offset_port(idx)?; - } - Ok(()) - } - /// Determine default config path /// /// In a unix-like environment, veilid-server will look for its config file @@ -962,22 +904,40 @@ impl Settings { } /// Determine default flamegraph output path - pub fn get_default_flame_path(subnode_index: u16) -> PathBuf { - std::env::temp_dir().join(if subnode_index == 0 { - "veilid-server.folded".to_owned() + pub fn get_default_flame_path(subnode_index: u16, subnode_count: u16) -> PathBuf { + let name = if subnode_count == 1 { + if subnode_index == 0 { + "veilid-server.folded".to_owned() + } else { + format!("veilid-server-{}.folded", subnode_index) + } } else { - format!("veilid-server-{}.folded", subnode_index) - }) + format!( + "veilid-server-{}-{}.folded", + subnode_index, + subnode_index + subnode_count - 1 + ) + }; + std::env::temp_dir().join(name) } /// Determine default perfetto output path #[cfg(unix)] - pub fn get_default_perfetto_path(subnode_index: u16) -> PathBuf { - std::env::temp_dir().join(if subnode_index == 0 { - "veilid-server.pftrace".to_owned() + pub fn get_default_perfetto_path(subnode_index: u16, subnode_count: u16) -> PathBuf { + let name = if subnode_count == 1 { + if subnode_index == 0 { + "veilid-server.pftrace".to_owned() + } else { + format!("veilid-server-{}.pftrace", subnode_index) + } } else { - format!("veilid-server-{}.pftrace", subnode_index) - }) + format!( + "veilid-server-{}-{}.pftrace", + subnode_index, + subnode_index + subnode_count - 1 + ) + }; + std::env::temp_dir().join(name) } #[cfg_attr(windows, expect(dead_code))] @@ -1270,17 +1230,22 @@ impl Settings { Err(eyre!("settings key '{key}' not found")) } - pub fn get_core_config_callback(&self) -> veilid_core::ConfigCallback { + pub fn get_core_config_callback( + &self, + subnode: u16, + subnode_offset: u16, + ) -> veilid_core::ConfigCallback { let inner = self.inner.clone(); Arc::new(move |key: String| { let inner = inner.read(); + let out: ConfigCallbackReturn = match key.as_str() { "program_name" => Ok(Box::new("veilid-server".to_owned())), - "namespace" => Ok(Box::new(if inner.testing.subnode_index == 0 { + "namespace" => Ok(Box::new(if subnode == 0 { "".to_owned() } else { - format!("subnode{}", inner.testing.subnode_index) + format!("subnode{}", subnode) })), "capabilities.disable" => { let mut caps = Vec::::new(); @@ -1492,6 +1457,8 @@ impl Settings { .application .https .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? .name .clone(), )), @@ -1505,16 +1472,16 @@ impl Settings { .to_string_lossy() .to_string(), )), - "network.application.https.url" => Ok(Box::new( - inner - .core - .network - .application - .https - .url - .as_ref() - .map(|a| a.urlstring.clone()), - )), + "network.application.https.url" => { + Ok(Box::new(match inner.core.network.application.https.url { + Some(ref a) => Some( + a.with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal) + .map(|x| x.urlstring.clone())?, + ), + None => None, + })) + } "network.application.http.enabled" => { Ok(Box::new(inner.core.network.application.http.enabled)) } @@ -1525,6 +1492,8 @@ impl Settings { .application .http .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? .name .clone(), )), @@ -1538,16 +1507,16 @@ impl Settings { .to_string_lossy() .to_string(), )), - "network.application.http.url" => Ok(Box::new( - inner - .core - .network - .application - .http - .url - .as_ref() - .map(|a| a.urlstring.clone()), - )), + "network.application.http.url" => { + Ok(Box::new(match inner.core.network.application.http.url { + Some(ref a) => Some( + a.with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal) + .map(|x| x.urlstring.clone())?, + ), + None => None, + })) + } "network.protocol.udp.enabled" => { Ok(Box::new(inner.core.network.protocol.udp.enabled)) } @@ -1555,7 +1524,16 @@ impl Settings { Ok(Box::new(inner.core.network.protocol.udp.socket_pool_size)) } "network.protocol.udp.listen_address" => Ok(Box::new( - inner.core.network.protocol.udp.listen_address.name.clone(), + inner + .core + .network + .protocol + .udp + .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? + .name + .clone(), )), "network.protocol.udp.public_address" => Ok(Box::new( inner @@ -1577,7 +1555,16 @@ impl Settings { Ok(Box::new(inner.core.network.protocol.tcp.max_connections)) } "network.protocol.tcp.listen_address" => Ok(Box::new( - inner.core.network.protocol.tcp.listen_address.name.clone(), + inner + .core + .network + .protocol + .tcp + .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? + .name + .clone(), )), "network.protocol.tcp.public_address" => Ok(Box::new( inner @@ -1597,7 +1584,16 @@ impl Settings { Ok(Box::new(inner.core.network.protocol.ws.max_connections)) } "network.protocol.ws.listen_address" => Ok(Box::new( - inner.core.network.protocol.ws.listen_address.name.clone(), + inner + .core + .network + .protocol + .ws + .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? + .name + .clone(), )), "network.protocol.ws.path" => Ok(Box::new( inner @@ -1609,16 +1605,16 @@ impl Settings { .to_string_lossy() .to_string(), )), - "network.protocol.ws.url" => Ok(Box::new( - inner - .core - .network - .protocol - .ws - .url - .as_ref() - .map(|a| a.urlstring.clone()), - )), + "network.protocol.ws.url" => { + Ok(Box::new(match inner.core.network.protocol.ws.url { + Some(ref a) => Some( + a.with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal) + .map(|x| x.urlstring.clone())?, + ), + None => None, + })) + } "network.protocol.wss.connect" => { Ok(Box::new(inner.core.network.protocol.wss.connect)) } @@ -1629,7 +1625,16 @@ impl Settings { Ok(Box::new(inner.core.network.protocol.wss.max_connections)) } "network.protocol.wss.listen_address" => Ok(Box::new( - inner.core.network.protocol.wss.listen_address.name.clone(), + inner + .core + .network + .protocol + .wss + .listen_address + .with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal)? + .name + .clone(), )), "network.protocol.wss.path" => Ok(Box::new( inner @@ -1641,16 +1646,16 @@ impl Settings { .to_string_lossy() .to_string(), )), - "network.protocol.wss.url" => Ok(Box::new( - inner - .core - .network - .protocol - .wss - .url - .as_ref() - .map(|a| a.urlstring.clone()), - )), + "network.protocol.wss.url" => { + Ok(Box::new(match inner.core.network.protocol.wss.url { + Some(ref a) => Some( + a.with_offset_port(subnode_offset) + .map_err(VeilidAPIError::internal) + .map(|x| x.urlstring.clone())?, + ), + None => None, + })) + } #[cfg(feature = "geolocation")] "network.privacy.country_code_denylist" => Ok(Box::new( inner.core.network.privacy.country_code_denylist.clone(),