From 981674b8708e52d2e49874d97ff4696ef314b0bc Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 18 Nov 2024 23:23:43 -0500 Subject: [PATCH] validation WIP --- Cargo.lock | 54 +++ veilid-tools/Cargo.toml | 2 + veilid-tools/src/virtual_network/commands.rs | 6 +- .../src/virtual_network/router_client.rs | 22 +- .../virtual_network/router_server/config.rs | 334 +++++++++++++----- .../router_server/machine_registry.rs | 111 ++++++ .../src/virtual_network/router_server/mod.rs | 163 ++------- .../router_server/server_processor.rs | 196 ++++++++++ .../virtual_network/virtual_network_error.rs | 4 + 9 files changed, 670 insertions(+), 222 deletions(-) create mode 100644 veilid-tools/src/virtual_network/router_server/machine_registry.rs create mode 100644 veilid-tools/src/virtual_network/router_server/server_processor.rs diff --git a/Cargo.lock b/Cargo.lock index 1948e7a9..063873c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1491,6 +1491,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", + "strsim 0.11.1", "syn 2.0.87", ] @@ -4362,6 +4363,28 @@ dependencies = [ "toml_edit 0.19.15", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "proc-macro2" version = "1.0.89" @@ -6267,6 +6290,36 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "validator" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b4a29d8709210980a09379f27ee31549b73292c87ab9899beee1c0d3be6303" +dependencies = [ + "idna 1.0.3", + "once_cell", + "regex", + "serde", + "serde_derive", + "serde_json", + "url", + "validator_derive", +] + +[[package]] +name = "validator_derive" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac855a2ce6f843beb229757e6e570a42e837bcb15e5f449dd48d5747d41bf77" +dependencies = [ + "darling 0.20.10", + "once_cell", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "valuable" version = "0.1.0" @@ -6619,6 +6672,7 @@ dependencies = [ "tracing-oslog", "tracing-subscriber", "tracing-wasm", + "validator", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 2077728f..050e3d35 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -50,6 +50,7 @@ virtual-network-server = [ "dep:config", "dep:ipnet", "dep:serde_yaml", + "dep:validator", "dep:ws_stream_tungstenite", ] @@ -111,6 +112,7 @@ config = { version = "^0", default-features = false, features = [ ], optional = true } ipnet = { version = "2", features = ["serde"], optional = true } serde_yaml = { package = "serde_yaml_ng", version = "^0.10.0", optional = true } +validator = { version = "0.19.0", features = ["derive"], optional = true } # Dependencies for WASM builds only [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies] diff --git a/veilid-tools/src/virtual_network/commands.rs b/veilid-tools/src/virtual_network/commands.rs index da3e7eb0..a318c813 100644 --- a/veilid-tools/src/virtual_network/commands.rs +++ b/veilid-tools/src/virtual_network/commands.rs @@ -202,17 +202,19 @@ pub enum ServerProcessorReplyValue { } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -pub enum ServerProcessorReplyResult { +pub enum ServerProcessorReplyStatus { Value(ServerProcessorReplyValue), InvalidMachineId, InvalidSocketId, + MissingProfile, + ProfileComplete, IoError(#[serde(with = "serde_io_error::SerdeIoErrorKindDef")] io::ErrorKind), } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct ServerProcessorReply { pub message_id: MessageId, - pub status: ServerProcessorReplyResult, + pub status: ServerProcessorReplyStatus, } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] diff --git a/veilid-tools/src/virtual_network/router_client.rs b/veilid-tools/src/virtual_network/router_client.rs index 680359d9..71a4bc8a 100644 --- a/veilid-tools/src/virtual_network/router_client.rs +++ b/veilid-tools/src/virtual_network/router_client.rs @@ -26,7 +26,7 @@ impl fmt::Debug for RouterClientInner { struct RouterClientUnlockedInner { sender: flume::Sender, next_message_id: AtomicU64, - router_op_waiter: RouterOpWaiter, + router_op_waiter: RouterOpWaiter, } impl fmt::Debug for RouterClientUnlockedInner { @@ -480,7 +480,7 @@ impl RouterClient { fn new( sender: flume::Sender, - router_op_waiter: RouterOpWaiter, + router_op_waiter: RouterOpWaiter, jh_handler: MustJoinHandle<()>, stop_source: StopSource, ) -> RouterClient { @@ -557,16 +557,20 @@ impl RouterClient { .map_err(|_| VirtualNetworkError::WaitError)?; match status { - ServerProcessorReplyResult::Value(server_processor_response) => { + ServerProcessorReplyStatus::Value(server_processor_response) => { Ok(server_processor_response) } - ServerProcessorReplyResult::InvalidMachineId => { + ServerProcessorReplyStatus::InvalidMachineId => { Err(VirtualNetworkError::InvalidMachineId) } - ServerProcessorReplyResult::InvalidSocketId => { + ServerProcessorReplyStatus::InvalidSocketId => { Err(VirtualNetworkError::InvalidSocketId) } - ServerProcessorReplyResult::IoError(k) => Err(VirtualNetworkError::IoError(k)), + ServerProcessorReplyStatus::MissingProfile => Err(VirtualNetworkError::MissingProfile), + ServerProcessorReplyStatus::ProfileComplete => { + Err(VirtualNetworkError::ProfileComplete) + } + ServerProcessorReplyStatus::IoError(k) => Err(VirtualNetworkError::IoError(k)), } } @@ -574,7 +578,7 @@ impl RouterClient { reader: R, writer: W, receiver: flume::Receiver, - router_op_waiter: RouterOpWaiter, + router_op_waiter: RouterOpWaiter, stop_token: StopToken, ) where R: AsyncReadExt + Unpin + Send, @@ -619,7 +623,7 @@ impl RouterClient { async fn run_local_processor( receiver: flume::Receiver, - router_op_waiter: RouterOpWaiter, + router_op_waiter: RouterOpWaiter, stop_token: StopToken, ) { let mut unord = FuturesUnordered::new(); @@ -640,7 +644,7 @@ impl RouterClient { async fn process_event( evt: ServerProcessorEvent, - router_op_waiter: RouterOpWaiter, + router_op_waiter: RouterOpWaiter, ) -> io::Result<()> { match evt { ServerProcessorEvent::Reply(reply) => { diff --git a/veilid-tools/src/virtual_network/router_server/config.rs b/veilid-tools/src/virtual_network/router_server/config.rs index 542a86fb..9dd89fed 100644 --- a/veilid-tools/src/virtual_network/router_server/config.rs +++ b/veilid-tools/src/virtual_network/router_server/config.rs @@ -3,11 +3,17 @@ use ipnet::*; use serde::*; use std::path::Path; -pub use ::config::ConfigError; +use validator::{Validate, ValidateArgs, ValidationError, ValidationErrors}; const PREDEFINED_CONFIG: &str = include_str!("predefined_config.yml"); const DEFAULT_CONFIG: &str = include_str!("default_config.yml"); +#[derive(Debug)] +pub enum ConfigError { + ParseError(::config::ConfigError), + ValidateError(validator::ValidationErrors), +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum WeightedList { @@ -19,6 +25,26 @@ impl Default for WeightedList { Self::List(Vec::new()) } } +impl Validate for WeightedList { + fn validate(&self) -> Result<(), ValidationErrors> { + let mut errors = ValidationErrors::new(); + if let Self::List(v) = self { + if v.is_empty() { + errors.add( + "List", + ValidationError::new("len") + .with_message("weighted list must not be empty".into()), + ) + } + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} pub type Probability = f32; @@ -29,9 +55,35 @@ pub enum Weighted { Unweighted(T), } -#[derive(Debug, Clone, Serialize, Deserialize)] +impl Validate for Weighted { + fn validate(&self) -> Result<(), ValidationErrors> { + let mut errors = ValidationErrors::new(); + if let Self::Weighted { item: _, weight } = self { + if *weight <= 0.0 { + errors.add( + "Weighted", + ValidationError::new("len") + .with_message("weight must be a positive value".into()), + ) + } + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[validate(context = "ValidateContext<'v_a>")] pub struct Profile { - instances: Vec, + #[validate( + length(min = 1), + custom(function = "validate_instances_exist", use_context) + )] + pub instances: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -41,33 +93,37 @@ pub enum Instance { Template { template: WeightedList }, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[validate(context = "ValidateContext<'v_a>")] pub struct Machine { #[serde(flatten)] - location: Location, + #[validate(custom(function = "validate_location_exists", use_context))] + pub location: Location, #[serde(default)] - address4: Option, + pub address4: Option, #[serde(default)] - address6: Option, + pub address6: Option, #[serde(default)] - disable_capabilities: Vec, + pub disable_capabilities: Vec, #[serde(default)] - bootstrap: bool, + pub bootstrap: bool, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[validate(context = "ValidateContext<'v_a>")] pub struct Template { #[serde(flatten)] - location: Location, + #[validate(custom(function = "validate_location_exists", use_context))] + pub location: Location, #[serde(flatten)] - limits: Limits, + pub limits: Limits, #[serde(default)] - disable_capabilities: Vec, + pub disable_capabilities: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Limits { - machine_count: WeightedList, + pub machine_count: WeightedList, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -79,95 +135,95 @@ pub enum Location { //////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Network { #[serde(default)] - model: Option, + pub model: Option, #[serde(default)] - ipv4: Option, + pub ipv4: Option, #[serde(default)] - ipv6: Option, + pub ipv6: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct NetworkIpv4 { - allocation: String, + pub allocation: String, #[serde(default)] - gateway: Option, + pub gateway: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct NetworkIpv6 { - allocation: String, + pub allocation: String, #[serde(default)] - gateway: Option, + pub gateway: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct NetworkGateway { - translation: Translation, - upnp: bool, - network: Option, + pub translation: Translation, + pub upnp: bool, + pub network: Option, } //////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Blueprint { #[serde(default)] - model: WeightedList, + pub model: WeightedList, #[serde(default)] - ipv4: Option, + pub ipv4: Option, #[serde(default)] - ipv6: Option, + pub ipv6: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct BlueprintIpv4 { #[serde(default)] - allocation: Option, - prefix: u8, + pub allocation: Option, + pub prefix: u8, #[serde(default)] - gateway: Option, + pub gateway: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct BlueprintIpv6 { #[serde(default)] - allocation: Option, - prefix: u8, + pub allocation: Option, + pub prefix: u8, #[serde(default)] - gateway: Option, + pub gateway: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct BlueprintGateway { - translation: WeightedList, - upnp: Probability, - network: Option, + pub translation: WeightedList, + pub upnp: Probability, + pub network: Option, } //////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Subnets { #[serde(default)] - subnet4: Vec, + pub subnet4: Vec, #[serde(default)] - subnet6: Vec, + pub subnet6: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Distance { - min: f32, - max: f32, + pub min: f32, + pub max: f32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Distribution { - mean: f32, - sigma: f32, - skew: f32, - min: f32, - max: f32, + pub mean: f32, + pub sigma: f32, + pub skew: f32, + pub min: f32, + pub max: f32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -185,52 +241,166 @@ impl Default for Translation { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Model { - latency: Distribution, + pub latency: Distribution, #[serde(default)] - distance: Option, + pub distance: Option, #[serde(default)] - loss: Probability, + pub loss: Probability, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct Allocation { #[serde(flatten)] - subnets: Subnets, + pub subnets: Subnets, } -#[derive(Debug, Clone, Serialize, Deserialize)] +struct ValidateContext<'a> { + config: &'a Config, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[validate(context = "ValidateContext<'v_a>")] pub struct Config { - seed: Option, - default_network: String, - default_model: String, + pub seed: Option, + #[validate( + length(min = 1), + custom(function = "validate_network_exists", use_context) + )] + pub default_network: String, + #[validate( + length(min = 1), + custom(function = "validate_model_exists", use_context) + )] + pub default_model: String, #[serde(default)] - profiles: HashMap, + #[validate(length(min = 1))] + pub profiles: HashMap, #[serde(default)] - machines: HashMap, + pub machines: HashMap, #[serde(default)] - templates: HashMap, + pub templates: HashMap, #[serde(default)] - networks: HashMap, + pub networks: HashMap, #[serde(default)] - blueprints: HashMap, + pub blueprints: HashMap, #[serde(default)] - allocations: HashMap, + pub allocations: HashMap, #[serde(default)] - models: HashMap, + pub models: HashMap, } impl Config { pub fn new>(config_file: Option

) -> Result { - let cfg = load_config(config_file)?; + let cfg = load_config(config_file).map_err(ConfigError::ParseError)?; // Generate config - cfg.try_deserialize() + let out: Self = cfg.try_deserialize().map_err(ConfigError::ParseError)?; + + // Validate config + let context = ValidateContext { config: &out }; + let mut errors = ValidationErrors::new(); + + if let Err(e) = out.validate_with_args(&context) { + errors = e; + } + + errors.merge_self("profiles", validate_all_profiles(&out.profiles, &context)); + errors.merge_self("machines", validate_all_machines(&out.machines, &context)); + errors.merge_self( + "templates", + validate_all_templates(&out.templates, &context), + ); + errors.merge_self("networks", validate_all_networks(&out.networks, &context)); + errors.merge_self( + "blueprints", + validate_all_blueprints(&out.blueprints, &context), + ); + errors.merge_self( + "allocation", + validate_all_allocations(&out.allocations, &context), + ); + errors.merge_self("models", validate_all_models(&out.models, &context)); + + if !errors.is_empty() { + return Err(ConfigError::ValidateError(errors)); + } + + Ok(out) } } -fn load_default_config() -> Result<::config::Config, ConfigError> { +fn validate_instances_exist( + value: &Vec, + context: &ValidateContext, +) -> Result<(), ValidationError> { + Ok(()) +} + +fn validate_location_exists( + value: &Location, + context: &ValidateContext, +) -> Result<(), ValidationError> { + Ok(()) +} + +fn validate_network_exists(value: &str, context: &ValidateContext) -> Result<(), ValidationError> { + Ok(()) +} + +fn validate_model_exists(value: &str, context: &ValidateContext) -> Result<(), ValidationError> { + Ok(()) +} + +fn validate_all_profiles( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + for x in value.values() { + x.validate_with_args(context)? + } + Ok(()) +} + +fn validate_all_machines( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} +fn validate_all_templates( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} +fn validate_all_networks( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} +fn validate_all_blueprints( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} +fn validate_all_allocations( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} +fn validate_all_models( + value: &HashMap, + context: &ValidateContext, +) -> Result<(), ValidationErrors> { + Ok(()) +} + +fn load_default_config() -> Result<::config::Config, ::config::ConfigError> { ::config::Config::builder() .add_source(::config::File::from_str( PREDEFINED_CONFIG, @@ -245,17 +415,17 @@ fn load_default_config() -> Result<::config::Config, ConfigError> { fn load_config>( opt_config_file: Option

, -) -> Result<::config::Config, ConfigError> { +) -> Result<::config::Config, ::config::ConfigError> { let Some(config_file) = opt_config_file else { return load_default_config(); }; let config_path = config_file.as_ref(); let Some(config_file_str) = config_path.to_str() else { - return Err(ConfigError::Message( + return Err(::config::ConfigError::Message( "config file path is not valid UTF-8".to_owned(), )); }; - ::config::Config::builder() + let config = ::config::Config::builder() .add_source(::config::File::from_str( PREDEFINED_CONFIG, ::config::FileFormat::Yaml, @@ -264,5 +434,7 @@ fn load_config>( config_file_str, ::config::FileFormat::Yaml, )) - .build() + .build()?; + + Ok(config) } diff --git a/veilid-tools/src/virtual_network/router_server/machine_registry.rs b/veilid-tools/src/virtual_network/router_server/machine_registry.rs new file mode 100644 index 00000000..c8f88c3d --- /dev/null +++ b/veilid-tools/src/virtual_network/router_server/machine_registry.rs @@ -0,0 +1,111 @@ +use super::*; + +#[derive(Debug)] +struct Machine {} + +#[derive(Debug)] +struct MachineRegistryUnlockedInner { + config: config::Config, +} + +#[derive(Debug, Default)] +struct ProfileState { + next_instance_index: usize, +} + +#[derive(Debug)] +struct MachineRegistryInner { + machines_by_id: HashMap, + current_profile_state: HashMap, +} + +#[derive(Debug, Clone)] +pub enum MachineRegistryError { + InvalidMachineId, + ProfileNotFound, + ProfileComplete, +} + +pub type MachineRegistryResult = Result; + +#[derive(Debug, Clone)] +pub struct MachineRegistry { + unlocked_inner: Arc, + inner: Arc>, +} + +impl MachineRegistry { + /////////////////////////////////////////////////////////// + /// Public Interface + pub fn new(config: config::Config) -> Self { + Self { + unlocked_inner: Arc::new(MachineRegistryUnlockedInner { config }), + inner: Arc::new(Mutex::new(MachineRegistryInner { + machines_by_id: HashMap::new(), + current_profile_state: HashMap::new(), + })), + } + } + + pub async fn allocate(&self, profile: String) -> MachineRegistryResult { + // Get profile definition + let Some(profile_def) = self.unlocked_inner.config.profiles.get(&profile) else { + return Err(MachineRegistryError::ProfileNotFound); + }; + + // Get current profile state, creating one if we have not yet started executing the profile + let mut inner = self.inner.lock(); + let current_profile_state = inner + .current_profile_state + .entry(profile) + .or_insert_with(|| ProfileState::default()); + + // Get the next instance from the definition + let Some(instance_def) = profile_def + .instances + .get(current_profile_state.next_instance_index) + else { + // + return Err(MachineRegistryError::ProfileComplete); + }; + + match instance_def { + config::Instance::Machine { machine } => { + self.create_machine(machine); + } + config::Instance::Template { template } => todo!(), + } + + Ok(machine_id) + } + + pub async fn release(&self, machine_id: MachineId) -> MachineRegistryResult<()> {} + + /////////////////////////////////////////////////////////// + /// Private Implementation + + async fn create_machine( + &self, + machine_def: config::Machine, + ) -> MachineRegistryResult { + // + } + + fn weighted_choice( + &self, + weighted_list: &config::WeightedList, + ) -> &T { + match weighted_list { + config::WeightedList::Single(x) => x, + config::WeightedList::List(vec) => { + let total_weight = vec + .iter() + .map(|x| match x { + config::Weighted::Weighted { item, weight } => weight, + config::Weighted::Unweighted(item) => 1.0, + }) + .reduce(|acc, x| acc + x); + } + } + } +} diff --git a/veilid-tools/src/virtual_network/router_server/mod.rs b/veilid-tools/src/virtual_network/router_server/mod.rs index 970e0c8e..e156024a 100644 --- a/veilid-tools/src/virtual_network/router_server/mod.rs +++ b/veilid-tools/src/virtual_network/router_server/mod.rs @@ -1,8 +1,12 @@ -mod config; - -pub use config::*; +pub mod config; +mod machine_registry; +mod server_processor; use super::*; + +use machine_registry::*; +use server_processor::*; + use async_tungstenite::accept_async; use futures_codec::{Bytes, BytesCodec, FramedRead, FramedWrite}; use futures_util::{stream::FuturesUnordered, AsyncReadExt, StreamExt, TryStreamExt}; @@ -31,9 +35,10 @@ enum RunLoopEvent { #[derive(Debug)] struct RouterServerUnlockedInner { - config: Config, + config: config::Config, new_client_sender: flume::Sender>, new_client_receiver: flume::Receiver>, + server_processor: ServerProcessor, } #[derive(Debug)] @@ -57,15 +62,19 @@ impl RouterServer { // Public Interface /// Create a router server for virtual networking - pub fn new(config: Config) -> Self { + pub fn new(config: config::Config) -> Self { // Make a channel to receive new clients let (new_client_sender, new_client_receiver) = flume::unbounded(); + // Make a server processor to handle messages + let server_processor = ServerProcessor::new(config.clone()); + Self { unlocked_inner: Arc::new(RouterServerUnlockedInner { config, new_client_sender, new_client_receiver, + server_processor, }), inner: Arc::new(Mutex::new(RouterServerInner {})), } @@ -95,10 +104,11 @@ impl RouterServer { let x = x; let cmd = from_bytes::(&x).map_err(io::Error::other)?; - self.clone() - .process_command(cmd, outbound_sender.clone()) - .await - .map_err(io::Error::other) + self.unlocked_inner + .server_processor + .enqueue_command(cmd, outbound_sender.clone()); + + Ok(()) })); let mut unord = FuturesUnordered::new(); @@ -244,16 +254,14 @@ impl RouterServer { let this = self.clone(); let inbound_receiver_fut = system_boxed(async move { - let fut = local_inbound_receiver + local_inbound_receiver .into_stream() - .map(Ok) - .try_for_each(|cmd| { - this.clone() - .process_command(cmd, local_outbound_sender.clone()) - }); - if let Err(e) = fut.await { - error!("{}", e); - } + .for_each(|cmd| async { + this.unlocked_inner + .server_processor + .enqueue_command(cmd, local_outbound_sender.clone()); + }) + .await; RunLoopEvent::Done }); @@ -273,6 +281,13 @@ impl RouterServer { let mut need_new_client_fut = true; + // Add server processor to run loop + unord.push( + self.unlocked_inner + .server_processor + .run_loop_process_commands(), + ); + loop { if need_new_client_fut { let new_client_receiver = self.unlocked_inner.new_client_receiver.clone(); @@ -311,116 +326,4 @@ impl RouterServer { //////////////////////////////////////////////////////////////////// // Private Implementation - - async fn process_command( - self, - cmd: ServerProcessorCommand, - outbound_sender: flume::Sender, - ) -> RouterServerResult<()> { - match cmd { - ServerProcessorCommand::Message(server_processor_message) => { - self.process_message( - server_processor_message.message_id, - server_processor_message.request, - outbound_sender, - ) - .await - } - ServerProcessorCommand::CloseSocket { - machine_id, - socket_id, - } => { - self.process_close_socket(machine_id, socket_id, outbound_sender) - .await - } - } - } - async fn process_close_socket( - self, - machine_id: MachineId, - socket_id: SocketId, - outbound_sender: flume::Sender, - ) -> RouterServerResult<()> { - // - Ok(()) - } - - async fn process_message( - self, - message_id: MessageId, - request: ServerProcessorRequest, - outbound_sender: flume::Sender, - ) -> RouterServerResult<()> { - match request { - ServerProcessorRequest::AllocateMachine { profile } => todo!(), - ServerProcessorRequest::ReleaseMachine { machine_id } => todo!(), - ServerProcessorRequest::GetInterfaces { machine_id } => todo!(), - ServerProcessorRequest::TcpConnect { - machine_id, - local_address, - remote_address, - timeout_ms, - options, - } => todo!(), - ServerProcessorRequest::TcpBind { - machine_id, - local_address, - options, - } => todo!(), - ServerProcessorRequest::TcpAccept { - machine_id, - listen_socket_id, - } => todo!(), - ServerProcessorRequest::TcpShutdown { - machine_id, - socket_id, - } => todo!(), - ServerProcessorRequest::UdpBind { - machine_id, - local_address, - options, - } => todo!(), - ServerProcessorRequest::Send { - machine_id, - socket_id, - data, - } => todo!(), - ServerProcessorRequest::SendTo { - machine_id, - socket_id, - remote_address, - data, - } => todo!(), - ServerProcessorRequest::Recv { - machine_id, - socket_id, - len, - } => todo!(), - ServerProcessorRequest::RecvFrom { - machine_id, - socket_id, - len, - } => todo!(), - ServerProcessorRequest::GetRoutedLocalAddress { - machine_id, - address_type, - } => todo!(), - ServerProcessorRequest::FindGateway { machine_id } => todo!(), - ServerProcessorRequest::GetExternalAddress { gateway_id } => todo!(), - ServerProcessorRequest::AddPort { - gateway_id, - protocol, - external_port, - local_address, - lease_duration_ms, - description, - } => todo!(), - ServerProcessorRequest::RemovePort { - gateway_id, - protocol, - external_port, - } => todo!(), - ServerProcessorRequest::TXTQuery { name } => todo!(), - } - } } diff --git a/veilid-tools/src/virtual_network/router_server/server_processor.rs b/veilid-tools/src/virtual_network/router_server/server_processor.rs new file mode 100644 index 00000000..29b37ae1 --- /dev/null +++ b/veilid-tools/src/virtual_network/router_server/server_processor.rs @@ -0,0 +1,196 @@ +use super::*; + +struct ServerProcessorCommandRecord { + cmd: ServerProcessorCommand, + outbound_sender: flume::Sender, +} + +#[derive(Debug)] +struct ServerProcessorInner { + // +} + +#[derive(Debug)] +struct ServerProcessorUnlockedInner { + config: config::Config, + receiver: flume::Receiver, + sender: flume::Sender, + + machine_registry: MachineRegistry, +} + +#[derive(Debug, Clone)] +pub struct ServerProcessor { + unlocked_inner: Arc, + inner: Arc>, +} + +impl ServerProcessor { + //////////////////////////////////////////////////////////////////////// + // Public Interface + + pub fn new(config: config::Config) -> Self { + let (sender, receiver) = flume::unbounded(); + + Self { + unlocked_inner: Arc::new(ServerProcessorUnlockedInner { + config: config.clone(), + sender, + receiver, + machine_registry: MachineRegistry::new(config), + }), + inner: Arc::new(Mutex::new(ServerProcessorInner {})), + } + } + + pub fn enqueue_command( + &self, + cmd: ServerProcessorCommand, + outbound_sender: flume::Sender, + ) { + if let Err(e) = self + .unlocked_inner + .sender + .send(ServerProcessorCommandRecord { + cmd, + outbound_sender, + }) + { + eprintln!("Dropped command: {}", e); + } + } + + pub fn run_loop_process_commands(&self) -> SendPinBoxFuture { + let receiver_stream = self.unlocked_inner.receiver.clone().into_stream(); + let this = self.clone(); + Box::pin(async move { + receiver_stream + .for_each_concurrent(None, |x| { + let this = this.clone(); + async move { + if let Err(e) = this.process_command(x.cmd, x.outbound_sender).await { + eprintln!("Failed to process command: {}", e); + } + } + }) + .await; + + RunLoopEvent::Done + }) + } + + //////////////////////////////////////////////////////////////////////// + // Private Implementation + + async fn process_command( + self, + cmd: ServerProcessorCommand, + outbound_sender: flume::Sender, + ) -> RouterServerResult<()> { + match cmd { + ServerProcessorCommand::Message(server_processor_message) => { + self.process_message( + server_processor_message.message_id, + server_processor_message.request, + outbound_sender, + ) + .await + } + ServerProcessorCommand::CloseSocket { + machine_id, + socket_id, + } => { + self.process_close_socket(machine_id, socket_id, outbound_sender) + .await + } + } + } + async fn process_close_socket( + self, + machine_id: MachineId, + socket_id: SocketId, + outbound_sender: flume::Sender, + ) -> RouterServerResult<()> { + // + Ok(()) + } + + async fn process_message( + self, + message_id: MessageId, + request: ServerProcessorRequest, + outbound_sender: flume::Sender, + ) -> RouterServerResult<()> { + match request { + ServerProcessorRequest::AllocateMachine { profile } => todo!(), + ServerProcessorRequest::ReleaseMachine { machine_id } => todo!(), + ServerProcessorRequest::GetInterfaces { machine_id } => todo!(), + ServerProcessorRequest::TcpConnect { + machine_id, + local_address, + remote_address, + timeout_ms, + options, + } => todo!(), + ServerProcessorRequest::TcpBind { + machine_id, + local_address, + options, + } => todo!(), + ServerProcessorRequest::TcpAccept { + machine_id, + listen_socket_id, + } => todo!(), + ServerProcessorRequest::TcpShutdown { + machine_id, + socket_id, + } => todo!(), + ServerProcessorRequest::UdpBind { + machine_id, + local_address, + options, + } => todo!(), + ServerProcessorRequest::Send { + machine_id, + socket_id, + data, + } => todo!(), + ServerProcessorRequest::SendTo { + machine_id, + socket_id, + remote_address, + data, + } => todo!(), + ServerProcessorRequest::Recv { + machine_id, + socket_id, + len, + } => todo!(), + ServerProcessorRequest::RecvFrom { + machine_id, + socket_id, + len, + } => todo!(), + ServerProcessorRequest::GetRoutedLocalAddress { + machine_id, + address_type, + } => todo!(), + ServerProcessorRequest::FindGateway { machine_id } => todo!(), + ServerProcessorRequest::GetExternalAddress { gateway_id } => todo!(), + ServerProcessorRequest::AddPort { + gateway_id, + protocol, + external_port, + local_address, + lease_duration_ms, + description, + } => todo!(), + ServerProcessorRequest::RemovePort { + gateway_id, + protocol, + external_port, + } => todo!(), + ServerProcessorRequest::TXTQuery { name } => todo!(), + } + } +} diff --git a/veilid-tools/src/virtual_network/virtual_network_error.rs b/veilid-tools/src/virtual_network/virtual_network_error.rs index d1d7fd95..4e7b564b 100644 --- a/veilid-tools/src/virtual_network/virtual_network_error.rs +++ b/veilid-tools/src/virtual_network/virtual_network_error.rs @@ -13,6 +13,10 @@ pub enum VirtualNetworkError { InvalidMachineId, #[error("Invalid socket id")] InvalidSocketId, + #[error("Missing profile")] + MissingProfile, + #[error("Profile complete")] + ProfileComplete, #[error("Io error: {0}")] IoError(io::ErrorKind), }