Replace the Lwt.async into the right context and localize the global clients map

We currently try to spawn 2 fibers [qubes_updated] and [listener] per clients
and we already finalise them correctly if the client is disconnected. However,
the Lwt.async is localized into add_client instead of where we attach a
finalisers for these tasks. The first objective of this patch is to be sure that
the Lwt.async is near where we registerd cancellation of these tasks.

The second part is to localize the global clients to avoid the ability to
read/write on it somewhere else. Only Dispatcher.watch_clients uses it - so it
corresponds to a free variable of the Dispatcher.watch_clients closure.
This commit is contained in:
Calascibetta Romain 2024-05-22 16:05:29 +02:00
parent 9af423463a
commit 0a0f0ad00f
3 changed files with 44 additions and 37 deletions

2
dao.ml
View File

@ -113,7 +113,7 @@ let watch_clients fn =
end >>= fun items -> end >>= fun items ->
Xen_os.Xs.make () >>= fun xs -> Xen_os.Xs.make () >>= fun xs ->
Lwt_list.map_p (vifs xs) items >>= fun items -> Lwt_list.map_p (vifs xs) items >>= fun items ->
fn (List.concat items |> VifMap.of_list); fn (List.concat items |> VifMap.of_list) >>= fun () ->
(* Wait for further updates *) (* Wait for further updates *)
Lwt.fail Xs_protocol.Eagain Lwt.fail Xs_protocol.Eagain
) )

View File

@ -15,7 +15,7 @@ module VifMap : sig
val find : key -> 'a t -> 'a option val find : key -> 'a t -> 'a option
end end
val watch_clients : (Ipaddr.V4.t VifMap.t -> unit) -> 'a Lwt.t val watch_clients : (Ipaddr.V4.t VifMap.t -> unit Lwt.t) -> 'a Lwt.t
(** [watch_clients fn] calls [fn clients] with the list of backend clients (** [watch_clients fn] calls [fn clients] with the list of backend clients
in XenStore, and again each time XenStore updates. *) in XenStore, and again each time XenStore updates. *)

View File

@ -17,8 +17,6 @@ struct
module I = Static_ipv4.Make (R) (Clock) (UplinkEth) (Arp) module I = Static_ipv4.Make (R) (Clock) (UplinkEth) (Arp)
module U = Udp.Make (I) (R) module U = Udp.Make (I) (R)
let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty
class client_iface eth ~domid ~gateway_ip ~client_ip client_mac : client_link class client_iface eth ~domid ~gateway_ip ~client_ip client_mac : client_link
= =
let log_header = Fmt.str "dom%d:%a" domid Ipaddr.V4.pp client_ip in let log_header = Fmt.str "dom%d:%a" domid Ipaddr.V4.pp client_ip in
@ -344,11 +342,12 @@ struct
(** Connect to a new client's interface and listen for incoming frames and firewall rule changes. *) (** Connect to a new client's interface and listen for incoming frames and firewall rule changes. *)
let add_vif get_ts { Dao.ClientVif.domid; device_id } dns_client dns_servers let add_vif get_ts { Dao.ClientVif.domid; device_id } dns_client dns_servers
~client_ip ~router ~cleanup_tasks qubesDB = ~client_ip ~router ~cleanup_tasks qubesDB () =
Netback.make ~domid ~device_id >>= fun backend -> let open Lwt.Syntax in
let* backend = Netback.make ~domid ~device_id in
Log.info (fun f -> Log.info (fun f ->
f "Client %d (IP: %s) ready" domid (Ipaddr.V4.to_string client_ip)); f "Client %d (IP: %s) ready" domid (Ipaddr.V4.to_string client_ip));
ClientEth.connect backend >>= fun eth -> let* eth = ClientEth.connect backend in
let client_mac = Netback.frontend_mac backend in let client_mac = Netback.frontend_mac backend in
let client_eth = router.clients in let client_eth = router.clients in
let gateway_ip = Client_eth.client_gw client_eth in let gateway_ip = Client_eth.client_gw client_eth in
@ -404,46 +403,54 @@ struct
(function Lwt.Canceled -> Lwt.return_unit | e -> Lwt.fail e) (function Lwt.Canceled -> Lwt.return_unit | e -> Lwt.fail e)
in in
Cleanup.on_cleanup cleanup_tasks (fun () -> Lwt.cancel listener); Cleanup.on_cleanup cleanup_tasks (fun () -> Lwt.cancel listener);
Lwt.pick [ qubesdb_updater; listener ] (* XXX(dinosaure): [qubes_updater] and [listener] can be forgotten, our [cleanup_task]
will cancel them if the client is disconnected. *)
Lwt.async (fun () -> Lwt.pick [ qubesdb_updater; listener ]);
Lwt.return_unit
(** A new client VM has been found in XenStore. Find its interface and connect to it. *) (** A new client VM has been found in XenStore. Find its interface and connect to it. *)
let add_client get_ts dns_client dns_servers ~router vif client_ip qubesDB = let add_client get_ts dns_client dns_servers ~router vif client_ip qubesDB =
let open Lwt.Syntax in
let cleanup_tasks = Cleanup.create () in let cleanup_tasks = Cleanup.create () in
Log.info (fun f -> Log.info (fun f ->
f "add client vif %a with IP %a" Dao.ClientVif.pp vif Ipaddr.V4.pp f "add client vif %a with IP %a" Dao.ClientVif.pp vif Ipaddr.V4.pp
client_ip); client_ip);
Lwt.async (fun () -> let* () =
Lwt.catch Lwt.catch (add_vif get_ts vif dns_client dns_servers ~client_ip ~router
(fun () -> ~cleanup_tasks qubesDB)
add_vif get_ts vif dns_client dns_servers ~client_ip ~router @@ fun exn ->
~cleanup_tasks qubesDB) Log.warn (fun f ->
(fun ex -> f "Error with client %a: %s" Dao.ClientVif.pp vif
Log.warn (fun f -> (Printexc.to_string exn));
f "Error with client %a: %s" Dao.ClientVif.pp vif Lwt.return_unit
(Printexc.to_string ex)); in
Lwt.return_unit)); Lwt.return cleanup_tasks
cleanup_tasks
(** Watch XenStore for notifications of new clients. *) (** Watch XenStore for notifications of new clients. *)
let wait_clients get_ts dns_client dns_servers qubesDB router = let wait_clients get_ts dns_client dns_servers qubesDB router =
Dao.watch_clients (fun new_set -> let open Lwt.Syntax in
(* Check for removed clients *) let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty in
!clients Dao.watch_clients @@ fun new_set ->
|> Dao.VifMap.iter (fun key cleanup -> (* Check for removed clients *)
if not (Dao.VifMap.mem key new_set) then ( let clean_up_clients key cleanup =
clients := !clients |> Dao.VifMap.remove key; if not (Dao.VifMap.mem key new_set) then begin
Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key); clients := !clients |> Dao.VifMap.remove key;
Cleanup.cleanup cleanup)); Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key);
(* Check for added clients *) Cleanup.cleanup cleanup
new_set end
|> Dao.VifMap.iter (fun key ip_addr -> in
if not (Dao.VifMap.mem key !clients) then ( Dao.VifMap.iter clean_up_clients !clients;
let cleanup = (* Check for added clients *)
add_client get_ts dns_client dns_servers ~router key ip_addr let rec go seq = match Seq.uncons seq with
qubesDB | None -> Lwt.return_unit
in | Some ((key, ipaddr), seq) when not (Dao.VifMap.mem key !clients) ->
Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key); let* cleanup = add_client get_ts dns_client dns_servers ~router key ipaddr qubesDB in
clients := !clients |> Dao.VifMap.add key cleanup))) Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key);
clients := Dao.VifMap.add key cleanup !clients;
go seq
| Some (_, seq) -> go seq
in
go (Dao.VifMap.to_seq new_set)
let send_dns_client_query t ~src_port ~dst ~dst_port buf = let send_dns_client_query t ~src_port ~dst ~dst_port buf =
match t.uplink with match t.uplink with