From 0a0f0ad00f30dac2c6f199bfc6e42b3a475a8ea7 Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Wed, 22 May 2024 16:05:29 +0200 Subject: [PATCH] Replace the Lwt.async into the right context and localize the global clients map We currently try to spawn 2 fibers [qubes_updated] and [listener] per clients and we already finalise them correctly if the client is disconnected. However, the Lwt.async is localized into add_client instead of where we attach a finalisers for these tasks. The first objective of this patch is to be sure that the Lwt.async is near where we registerd cancellation of these tasks. The second part is to localize the global clients to avoid the ability to read/write on it somewhere else. Only Dispatcher.watch_clients uses it - so it corresponds to a free variable of the Dispatcher.watch_clients closure. --- dao.ml | 2 +- dao.mli | 2 +- dispatcher.ml | 77 ++++++++++++++++++++++++++++----------------------- 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/dao.ml b/dao.ml index 2e94660..27b8bda 100644 --- a/dao.ml +++ b/dao.ml @@ -113,7 +113,7 @@ let watch_clients fn = end >>= fun items -> Xen_os.Xs.make () >>= fun xs -> Lwt_list.map_p (vifs xs) items >>= fun items -> - fn (List.concat items |> VifMap.of_list); + fn (List.concat items |> VifMap.of_list) >>= fun () -> (* Wait for further updates *) Lwt.fail Xs_protocol.Eagain ) diff --git a/dao.mli b/dao.mli index bff4cbf..c278d16 100644 --- a/dao.mli +++ b/dao.mli @@ -15,7 +15,7 @@ module VifMap : sig val find : key -> 'a t -> 'a option end -val watch_clients : (Ipaddr.V4.t VifMap.t -> unit) -> 'a Lwt.t +val watch_clients : (Ipaddr.V4.t VifMap.t -> unit Lwt.t) -> 'a Lwt.t (** [watch_clients fn] calls [fn clients] with the list of backend clients in XenStore, and again each time XenStore updates. *) diff --git a/dispatcher.ml b/dispatcher.ml index fc21cdd..190fb72 100644 --- a/dispatcher.ml +++ b/dispatcher.ml @@ -17,8 +17,6 @@ struct module I = Static_ipv4.Make (R) (Clock) (UplinkEth) (Arp) module U = Udp.Make (I) (R) - let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty - class client_iface eth ~domid ~gateway_ip ~client_ip client_mac : client_link = let log_header = Fmt.str "dom%d:%a" domid Ipaddr.V4.pp client_ip in @@ -344,11 +342,12 @@ struct (** Connect to a new client's interface and listen for incoming frames and firewall rule changes. *) let add_vif get_ts { Dao.ClientVif.domid; device_id } dns_client dns_servers - ~client_ip ~router ~cleanup_tasks qubesDB = - Netback.make ~domid ~device_id >>= fun backend -> + ~client_ip ~router ~cleanup_tasks qubesDB () = + let open Lwt.Syntax in + let* backend = Netback.make ~domid ~device_id in Log.info (fun f -> f "Client %d (IP: %s) ready" domid (Ipaddr.V4.to_string client_ip)); - ClientEth.connect backend >>= fun eth -> + let* eth = ClientEth.connect backend in let client_mac = Netback.frontend_mac backend in let client_eth = router.clients in let gateway_ip = Client_eth.client_gw client_eth in @@ -404,46 +403,54 @@ struct (function Lwt.Canceled -> Lwt.return_unit | e -> Lwt.fail e) in Cleanup.on_cleanup cleanup_tasks (fun () -> Lwt.cancel listener); - Lwt.pick [ qubesdb_updater; listener ] + (* XXX(dinosaure): [qubes_updater] and [listener] can be forgotten, our [cleanup_task] + will cancel them if the client is disconnected. *) + Lwt.async (fun () -> Lwt.pick [ qubesdb_updater; listener ]); + Lwt.return_unit (** A new client VM has been found in XenStore. Find its interface and connect to it. *) let add_client get_ts dns_client dns_servers ~router vif client_ip qubesDB = + let open Lwt.Syntax in let cleanup_tasks = Cleanup.create () in Log.info (fun f -> f "add client vif %a with IP %a" Dao.ClientVif.pp vif Ipaddr.V4.pp client_ip); - Lwt.async (fun () -> - Lwt.catch - (fun () -> - add_vif get_ts vif dns_client dns_servers ~client_ip ~router - ~cleanup_tasks qubesDB) - (fun ex -> - Log.warn (fun f -> - f "Error with client %a: %s" Dao.ClientVif.pp vif - (Printexc.to_string ex)); - Lwt.return_unit)); - cleanup_tasks + let* () = + Lwt.catch (add_vif get_ts vif dns_client dns_servers ~client_ip ~router + ~cleanup_tasks qubesDB) + @@ fun exn -> + Log.warn (fun f -> + f "Error with client %a: %s" Dao.ClientVif.pp vif + (Printexc.to_string exn)); + Lwt.return_unit + in + Lwt.return cleanup_tasks (** Watch XenStore for notifications of new clients. *) let wait_clients get_ts dns_client dns_servers qubesDB router = - Dao.watch_clients (fun new_set -> - (* Check for removed clients *) - !clients - |> Dao.VifMap.iter (fun key cleanup -> - if not (Dao.VifMap.mem key new_set) then ( - clients := !clients |> Dao.VifMap.remove key; - Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key); - Cleanup.cleanup cleanup)); - (* Check for added clients *) - new_set - |> Dao.VifMap.iter (fun key ip_addr -> - if not (Dao.VifMap.mem key !clients) then ( - let cleanup = - add_client get_ts dns_client dns_servers ~router key ip_addr - qubesDB - in - Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key); - clients := !clients |> Dao.VifMap.add key cleanup))) + let open Lwt.Syntax in + let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty in + Dao.watch_clients @@ fun new_set -> + (* Check for removed clients *) + let clean_up_clients key cleanup = + if not (Dao.VifMap.mem key new_set) then begin + clients := !clients |> Dao.VifMap.remove key; + Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key); + Cleanup.cleanup cleanup + end + in + Dao.VifMap.iter clean_up_clients !clients; + (* Check for added clients *) + let rec go seq = match Seq.uncons seq with + | None -> Lwt.return_unit + | Some ((key, ipaddr), seq) when not (Dao.VifMap.mem key !clients) -> + let* cleanup = add_client get_ts dns_client dns_servers ~router key ipaddr qubesDB in + Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key); + clients := Dao.VifMap.add key cleanup !clients; + go seq + | Some (_, seq) -> go seq + in + go (Dao.VifMap.to_seq new_set) let send_dns_client_query t ~src_port ~dst ~dst_port buf = match t.uplink with