add multi-subnode execution for veilid-server

This commit is contained in:
Christien Rioux 2025-01-18 18:26:41 -05:00
parent 766aca4502
commit 5f594e2aa7
9 changed files with 252 additions and 172 deletions

View File

@ -27,6 +27,7 @@ logging:
enabled: false
testing:
subnode_index: 0
subnode_count: 1
core:
protected_store:
allow_insecure_fallback: true

View File

@ -138,6 +138,7 @@ otlp:
```yaml
testing:
subnode_index: 0
subnode_count: 1
```
### core

View File

@ -289,7 +289,10 @@ impl Network {
if !from.ip().is_unspecified() {
vec![from]
} else {
let addrs = self.last_network_state().stable_interface_addresses;
let addrs = self
.last_network_state()
.unwrap()
.stable_interface_addresses;
addrs
.iter()
.filter_map(|a| {

View File

@ -41,8 +41,8 @@ impl Network {
addrs
}
pub(super) fn last_network_state(&self) -> NetworkState {
self.inner.lock().network_state.clone().unwrap()
pub(super) fn last_network_state(&self) -> Option<NetworkState> {
self.inner.lock().network_state.clone()
}
pub(super) fn is_stable_interface_address(&self, addr: IpAddr) -> bool {

View File

@ -29,7 +29,7 @@ impl Network {
}
};
if new_network_state != last_network_state {
if last_network_state.is_none() || new_network_state != last_network_state.unwrap() {
// Save new network state
{
let mut inner = self.inner.lock();

View File

@ -39,7 +39,7 @@ struct RequestLine {
// Request to process
line: String,
// Where to send the response
responses_tx: flume::Sender<String>,
responses_tx: flume::Sender<Arc<String>>,
}
struct ClientApiInner {
@ -48,7 +48,7 @@ struct ClientApiInner {
settings: Settings,
stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>,
update_channels: HashMap<u64, flume::Sender<String>>,
update_channels: HashMap<u64, flume::Sender<Arc<String>>>,
}
#[derive(Clone)]
@ -305,7 +305,8 @@ impl ClientApi {
debug!("JSONAPI: Response: {:?}", response);
// Marshal json + newline => NDJSON
let response_string = serialize_json(json_api::RecvMessage::Response(response)) + "\n";
let response_string =
Arc::new(serialize_json(json_api::RecvMessage::Response(response)) + "\n");
if let Err(e) = responses_tx.send_async(response_string).await {
eprintln!("response not sent: {}", e)
}
@ -322,7 +323,7 @@ impl ClientApi {
self,
mut reader: R,
requests_tx: flume::Sender<Option<RequestLine>>,
responses_tx: flume::Sender<String>,
responses_tx: flume::Sender<Arc<String>>,
) -> VeilidAPIResult<Option<RequestLine>> {
let mut linebuf = String::new();
while let Ok(size) = reader.read_line(&mut linebuf).await {
@ -356,7 +357,7 @@ impl ClientApi {
async fn send_responses<W: AsyncWriteExt + Unpin>(
self,
responses_rx: flume::Receiver<String>,
responses_rx: flume::Receiver<Arc<String>>,
mut writer: W,
) -> VeilidAPIResult<Option<RequestLine>> {
while let Ok(resp) = responses_rx.recv_async().await {
@ -521,11 +522,16 @@ impl ClientApi {
}
pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) {
// serialize update to NDJSON
let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n";
// Pass other updates to clients
let inner = self.inner.lock();
if inner.update_channels.is_empty() {
return;
}
// serialize update to NDJSON
let veilid_update =
Arc::new(serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n");
// Pass updates to clients
for ch in inner.update_channels.values() {
if ch.send(veilid_update.clone()).is_err() {
// eprintln!("failed to send update: {}", e);

View File

@ -99,7 +99,7 @@ pub struct CmdlineArgs {
/// Run several nodes in parallel on the same machine for testing purposes
///
/// Will run subnodes N though N+(subnode_count-1), where N is 0 or set via --subnode_index
/// Will run subnodes N through N+(subnode_count-1), where N is 0 or set via --subnode_index
#[arg(long, value_name = "COUNT")]
subnode_count: Option<u16>,
@ -214,10 +214,6 @@ fn main() -> EyreResult<()> {
if args.foreground {
settingsrw.daemon.enabled = false;
}
if let Some(subnode_index) = args.subnode_index {
settingsrw.testing.subnode_index = subnode_index;
};
if args.logging.debug {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Debug;
@ -226,23 +222,33 @@ fn main() -> EyreResult<()> {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Trace;
}
if let Some(subnode_index) = args.subnode_index {
settingsrw.testing.subnode_index = subnode_index;
};
if let Some(subnode_count) = args.subnode_count {
if subnode_count == 0 {
bail!("subnode count must be positive");
}
settingsrw.testing.subnode_count = subnode_count;
};
#[cfg(feature = "opentelemetry-otlp")]
if args.otlp.is_some() {
if let Some(otlp) = args.otlp {
println!("Enabling OTLP tracing");
settingsrw.logging.otlp.enabled = true;
settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str(
args.otlp
.expect("should not be null because of default missing value")
.as_str(),
)
.wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.grpc_endpoint =
NamedSocketAddrs::from_str(&otlp).wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.level = LogLevel::Trace;
}
if let Some(flame) = args.flame {
let flame = if flame.is_empty() {
Settings::get_default_flame_path(settingsrw.testing.subnode_index)
.to_string_lossy()
.to_string()
Settings::get_default_flame_path(
settingsrw.testing.subnode_index,
settingsrw.testing.subnode_count,
)
.to_string_lossy()
.to_string()
} else {
flame.to_string_lossy().to_string()
};
@ -253,9 +259,12 @@ fn main() -> EyreResult<()> {
#[cfg(unix)]
if let Some(perfetto) = args.perfetto {
let perfetto = if perfetto.is_empty() {
Settings::get_default_perfetto_path(settingsrw.testing.subnode_index)
.to_string_lossy()
.to_string()
Settings::get_default_perfetto_path(
settingsrw.testing.subnode_index,
settingsrw.testing.subnode_count,
)
.to_string_lossy()
.to_string()
} else {
perfetto.to_string_lossy().to_string()
};
@ -297,6 +306,10 @@ fn main() -> EyreResult<()> {
}
let mut node_id_set = false;
if let Some(key_set) = args.set_node_id {
if settingsrw.testing.subnode_count != 1 {
bail!("subnode count must be 1 if setting node id/secret");
}
node_id_set = true;
// Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false;
@ -360,11 +373,6 @@ fn main() -> EyreResult<()> {
}
}
// Apply subnode index if we're testing
settings
.apply_subnode_index()
.wrap_err("failed to apply subnode index")?;
// --- Verify Config ---
settings.verify()?;

View File

@ -32,8 +32,8 @@ pub fn shutdown() {
}
}
//#[instrument(err, skip_all)]
pub async fn run_veilid_server(
pub async fn run_veilid_server_subnode(
subnode: u16,
settings: Settings,
server_mode: ServerMode,
veilid_logs: VeilidLogs,
@ -44,17 +44,34 @@ pub async fn run_veilid_server(
settings_client_api_network_enabled,
settings_client_api_ipc_directory,
settings_client_api_listen_address_addrs,
subnode_index,
subnode_offset,
) = {
let settingsr = settings.read();
cfg_if! {
if #[cfg(feature = "virtual-network")] {
let subnode_offset = if inner.core.network.virtual_network.enabled {
// Don't offset ports when using virtual networking
0
} else {
subnode
};
} else {
let subnode_offset = subnode;
}
}
(
settingsr.auto_attach,
settingsr.client_api.ipc_enabled,
settingsr.client_api.network_enabled,
settingsr.client_api.ipc_directory.clone(),
settingsr.client_api.listen_address.addrs.clone(),
settingsr.testing.subnode_index,
settingsr
.client_api
.listen_address
.with_offset_port(subnode_offset)?
.addrs,
subnode_offset,
)
};
@ -72,7 +89,7 @@ pub async fn run_veilid_server(
eprintln!("error sending veilid update callback: {:?}", change);
}
});
let config_callback = settings.get_core_config_callback();
let config_callback = settings.get_core_config_callback(subnode, subnode_offset);
// Start Veilid Core and get API
let veilid_api = veilid_core::api_startup(update_callback, config_callback)
@ -86,7 +103,7 @@ pub async fn run_veilid_server(
client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone());
some_capi.clone().run(
if settings_client_api_ipc_enabled {
Some(settings_client_api_ipc_directory.join(subnode_index.to_string()))
Some(settings_client_api_ipc_directory.join(subnode.to_string()))
} else {
None
},
@ -108,7 +125,7 @@ pub async fn run_veilid_server(
let capi2 = capi.clone();
let update_receiver_shutdown = SingleShotEventual::new(Some(()));
let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse();
let update_receiver_jh = spawn_local(
let update_receiver_jh = spawn(
"update_receiver",
async move {
loop {
@ -116,7 +133,7 @@ pub async fn run_veilid_server(
res = receiver.recv_async() => {
if let Ok(change) = res {
if let Some(capi) = &capi2 {
// Handle state changes on main thread for capnproto rpc
// Handle state changes for JSON API
capi.clone().handle_update(change);
}
} else {
@ -201,9 +218,48 @@ pub async fn run_veilid_server(
// Wait for update receiver to exit
let _ = update_receiver_jh.await;
out
}
//#[instrument(err, skip_all)]
pub async fn run_veilid_server(
settings: Settings,
server_mode: ServerMode,
veilid_logs: VeilidLogs,
) -> EyreResult<()> {
let (subnode_index, subnode_count) = {
let settingsr = settings.read();
(
settingsr.testing.subnode_index,
settingsr.testing.subnode_count,
)
};
// Ensure we only try to spawn multiple subnodes in 'normal' execution mode
if !matches!(server_mode, ServerMode::Normal) && subnode_count != 1 {
bail!("can only have multiple subnodes in 'normal' execution mode");
}
// Run all subnodes
let mut all_subnodes_jh = vec![];
for subnode in subnode_index..(subnode_index + subnode_count) {
debug!("Spawning subnode {}", subnode);
let jh = spawn(
&format!("subnode{}", subnode),
run_veilid_server_subnode(subnode, settings.clone(), server_mode, veilid_logs.clone()),
);
all_subnodes_jh.push(jh);
}
// Wait for all subnodes to complete
for (sn, jh) in all_subnodes_jh.into_iter().enumerate() {
jh.await?;
debug!("Subnode {} exited", sn);
}
// Finally, drop logs
// this is explicit to ensure we don't accidentally drop them too soon via a move
drop(veilid_logs);
out
Ok(())
}

View File

@ -377,6 +377,11 @@ impl ParsedUrl {
self.urlstring = self.url.to_string();
Ok(())
}
pub fn with_offset_port(&self, offset: u16) -> EyreResult<Self> {
let mut x = self.clone();
x.offset_port(offset)?;
Ok(x)
}
}
impl FromStr for ParsedUrl {
@ -481,6 +486,12 @@ impl NamedSocketAddrs {
Ok(true)
}
pub fn with_offset_port(&self, offset: u16) -> EyreResult<Self> {
let mut x = self.clone();
x.offset_port(offset)?;
Ok(x)
}
}
#[derive(Debug, Deserialize, Serialize)]
@ -863,75 +874,6 @@ impl Settings {
self.inner.write()
}
pub fn apply_subnode_index(&self) -> EyreResult<()> {
let mut settingsrw = self.write();
let idx = settingsrw.testing.subnode_index;
if idx == 0 {
return Ok(());
}
// bump client api port
settingsrw.client_api.listen_address.offset_port(idx)?;
// bump protocol ports
settingsrw
.core
.network
.protocol
.udp
.listen_address
.offset_port(idx)?;
settingsrw
.core
.network
.protocol
.tcp
.listen_address
.offset_port(idx)?;
settingsrw
.core
.network
.protocol
.ws
.listen_address
.offset_port(idx)?;
if let Some(url) = &mut settingsrw.core.network.protocol.ws.url {
url.offset_port(idx)?;
}
settingsrw
.core
.network
.protocol
.wss
.listen_address
.offset_port(idx)?;
if let Some(url) = &mut settingsrw.core.network.protocol.wss.url {
url.offset_port(idx)?;
}
// bump application ports
settingsrw
.core
.network
.application
.http
.listen_address
.offset_port(idx)?;
if let Some(url) = &mut settingsrw.core.network.application.http.url {
url.offset_port(idx)?;
}
settingsrw
.core
.network
.application
.https
.listen_address
.offset_port(idx)?;
if let Some(url) = &mut settingsrw.core.network.application.https.url {
url.offset_port(idx)?;
}
Ok(())
}
/// Determine default config path
///
/// In a unix-like environment, veilid-server will look for its config file
@ -962,22 +904,40 @@ impl Settings {
}
/// Determine default flamegraph output path
pub fn get_default_flame_path(subnode_index: u16) -> PathBuf {
std::env::temp_dir().join(if subnode_index == 0 {
"veilid-server.folded".to_owned()
pub fn get_default_flame_path(subnode_index: u16, subnode_count: u16) -> PathBuf {
let name = if subnode_count == 1 {
if subnode_index == 0 {
"veilid-server.folded".to_owned()
} else {
format!("veilid-server-{}.folded", subnode_index)
}
} else {
format!("veilid-server-{}.folded", subnode_index)
})
format!(
"veilid-server-{}-{}.folded",
subnode_index,
subnode_index + subnode_count - 1
)
};
std::env::temp_dir().join(name)
}
/// Determine default perfetto output path
#[cfg(unix)]
pub fn get_default_perfetto_path(subnode_index: u16) -> PathBuf {
std::env::temp_dir().join(if subnode_index == 0 {
"veilid-server.pftrace".to_owned()
pub fn get_default_perfetto_path(subnode_index: u16, subnode_count: u16) -> PathBuf {
let name = if subnode_count == 1 {
if subnode_index == 0 {
"veilid-server.pftrace".to_owned()
} else {
format!("veilid-server-{}.pftrace", subnode_index)
}
} else {
format!("veilid-server-{}.pftrace", subnode_index)
})
format!(
"veilid-server-{}-{}.pftrace",
subnode_index,
subnode_index + subnode_count - 1
)
};
std::env::temp_dir().join(name)
}
#[cfg_attr(windows, expect(dead_code))]
@ -1270,17 +1230,22 @@ impl Settings {
Err(eyre!("settings key '{key}' not found"))
}
pub fn get_core_config_callback(&self) -> veilid_core::ConfigCallback {
pub fn get_core_config_callback(
&self,
subnode: u16,
subnode_offset: u16,
) -> veilid_core::ConfigCallback {
let inner = self.inner.clone();
Arc::new(move |key: String| {
let inner = inner.read();
let out: ConfigCallbackReturn = match key.as_str() {
"program_name" => Ok(Box::new("veilid-server".to_owned())),
"namespace" => Ok(Box::new(if inner.testing.subnode_index == 0 {
"namespace" => Ok(Box::new(if subnode == 0 {
"".to_owned()
} else {
format!("subnode{}", inner.testing.subnode_index)
format!("subnode{}", subnode)
})),
"capabilities.disable" => {
let mut caps = Vec::<FourCC>::new();
@ -1492,6 +1457,8 @@ impl Settings {
.application
.https
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
@ -1505,16 +1472,16 @@ impl Settings {
.to_string_lossy()
.to_string(),
)),
"network.application.https.url" => Ok(Box::new(
inner
.core
.network
.application
.https
.url
.as_ref()
.map(|a| a.urlstring.clone()),
)),
"network.application.https.url" => {
Ok(Box::new(match inner.core.network.application.https.url {
Some(ref a) => Some(
a.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)
.map(|x| x.urlstring.clone())?,
),
None => None,
}))
}
"network.application.http.enabled" => {
Ok(Box::new(inner.core.network.application.http.enabled))
}
@ -1525,6 +1492,8 @@ impl Settings {
.application
.http
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
@ -1538,16 +1507,16 @@ impl Settings {
.to_string_lossy()
.to_string(),
)),
"network.application.http.url" => Ok(Box::new(
inner
.core
.network
.application
.http
.url
.as_ref()
.map(|a| a.urlstring.clone()),
)),
"network.application.http.url" => {
Ok(Box::new(match inner.core.network.application.http.url {
Some(ref a) => Some(
a.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)
.map(|x| x.urlstring.clone())?,
),
None => None,
}))
}
"network.protocol.udp.enabled" => {
Ok(Box::new(inner.core.network.protocol.udp.enabled))
}
@ -1555,7 +1524,16 @@ impl Settings {
Ok(Box::new(inner.core.network.protocol.udp.socket_pool_size))
}
"network.protocol.udp.listen_address" => Ok(Box::new(
inner.core.network.protocol.udp.listen_address.name.clone(),
inner
.core
.network
.protocol
.udp
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
"network.protocol.udp.public_address" => Ok(Box::new(
inner
@ -1577,7 +1555,16 @@ impl Settings {
Ok(Box::new(inner.core.network.protocol.tcp.max_connections))
}
"network.protocol.tcp.listen_address" => Ok(Box::new(
inner.core.network.protocol.tcp.listen_address.name.clone(),
inner
.core
.network
.protocol
.tcp
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
"network.protocol.tcp.public_address" => Ok(Box::new(
inner
@ -1597,7 +1584,16 @@ impl Settings {
Ok(Box::new(inner.core.network.protocol.ws.max_connections))
}
"network.protocol.ws.listen_address" => Ok(Box::new(
inner.core.network.protocol.ws.listen_address.name.clone(),
inner
.core
.network
.protocol
.ws
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
"network.protocol.ws.path" => Ok(Box::new(
inner
@ -1609,16 +1605,16 @@ impl Settings {
.to_string_lossy()
.to_string(),
)),
"network.protocol.ws.url" => Ok(Box::new(
inner
.core
.network
.protocol
.ws
.url
.as_ref()
.map(|a| a.urlstring.clone()),
)),
"network.protocol.ws.url" => {
Ok(Box::new(match inner.core.network.protocol.ws.url {
Some(ref a) => Some(
a.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)
.map(|x| x.urlstring.clone())?,
),
None => None,
}))
}
"network.protocol.wss.connect" => {
Ok(Box::new(inner.core.network.protocol.wss.connect))
}
@ -1629,7 +1625,16 @@ impl Settings {
Ok(Box::new(inner.core.network.protocol.wss.max_connections))
}
"network.protocol.wss.listen_address" => Ok(Box::new(
inner.core.network.protocol.wss.listen_address.name.clone(),
inner
.core
.network
.protocol
.wss
.listen_address
.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)?
.name
.clone(),
)),
"network.protocol.wss.path" => Ok(Box::new(
inner
@ -1641,16 +1646,16 @@ impl Settings {
.to_string_lossy()
.to_string(),
)),
"network.protocol.wss.url" => Ok(Box::new(
inner
.core
.network
.protocol
.wss
.url
.as_ref()
.map(|a| a.urlstring.clone()),
)),
"network.protocol.wss.url" => {
Ok(Box::new(match inner.core.network.protocol.wss.url {
Some(ref a) => Some(
a.with_offset_port(subnode_offset)
.map_err(VeilidAPIError::internal)
.map(|x| x.urlstring.clone())?,
),
None => None,
}))
}
#[cfg(feature = "geolocation")]
"network.privacy.country_code_denylist" => Ok(Box::new(
inner.core.network.privacy.country_code_denylist.clone(),