Replace the Lwt.async into the right context and localize the global clients map

We currently try to spawn 2 fibers [qubes_updated] and [listener] per clients
and we already finalise them correctly if the client is disconnected. However,
the Lwt.async is localized into add_client instead of where we attach a
finalisers for these tasks. The first objective of this patch is to be sure that
the Lwt.async is near where we registerd cancellation of these tasks.

The second part is to localize the global clients to avoid the ability to
read/write on it somewhere else. Only Dispatcher.watch_clients uses it - so it
corresponds to a free variable of the Dispatcher.watch_clients closure.
This commit is contained in:
Calascibetta Romain 2024-05-22 16:05:29 +02:00 committed by Hannes Mehnert
parent a7cb153ee1
commit 12ed2b268d
3 changed files with 44 additions and 37 deletions

2
dao.ml
View file

@ -113,7 +113,7 @@ let watch_clients fn =
end >>= fun items -> end >>= fun items ->
Xen_os.Xs.make () >>= fun xs -> Xen_os.Xs.make () >>= fun xs ->
Lwt_list.map_p (vifs xs) items >>= fun items -> Lwt_list.map_p (vifs xs) items >>= fun items ->
fn (List.concat items |> VifMap.of_list); fn (List.concat items |> VifMap.of_list) >>= fun () ->
(* Wait for further updates *) (* Wait for further updates *)
Lwt.fail Xs_protocol.Eagain Lwt.fail Xs_protocol.Eagain
) )

View file

@ -15,7 +15,7 @@ module VifMap : sig
val find : key -> 'a t -> 'a option val find : key -> 'a t -> 'a option
end end
val watch_clients : (Ipaddr.V4.t VifMap.t -> unit) -> 'a Lwt.t val watch_clients : (Ipaddr.V4.t VifMap.t -> unit Lwt.t) -> 'a Lwt.t
(** [watch_clients fn] calls [fn clients] with the list of backend clients (** [watch_clients fn] calls [fn clients] with the list of backend clients
in XenStore, and again each time XenStore updates. *) in XenStore, and again each time XenStore updates. *)

View file

@ -17,8 +17,6 @@ struct
module I = Static_ipv4.Make (R) (Clock) (UplinkEth) (Arp) module I = Static_ipv4.Make (R) (Clock) (UplinkEth) (Arp)
module U = Udp.Make (I) (R) module U = Udp.Make (I) (R)
let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty
class client_iface eth ~domid ~gateway_ip ~client_ip client_mac : client_link class client_iface eth ~domid ~gateway_ip ~client_ip client_mac : client_link
= =
let log_header = Fmt.str "dom%d:%a" domid Ipaddr.V4.pp client_ip in let log_header = Fmt.str "dom%d:%a" domid Ipaddr.V4.pp client_ip in
@ -344,11 +342,12 @@ struct
(** Connect to a new client's interface and listen for incoming frames and firewall rule changes. *) (** Connect to a new client's interface and listen for incoming frames and firewall rule changes. *)
let add_vif get_ts { Dao.ClientVif.domid; device_id } dns_client dns_servers let add_vif get_ts { Dao.ClientVif.domid; device_id } dns_client dns_servers
~client_ip ~router ~cleanup_tasks qubesDB = ~client_ip ~router ~cleanup_tasks qubesDB () =
Netback.make ~domid ~device_id >>= fun backend -> let open Lwt.Syntax in
let* backend = Netback.make ~domid ~device_id in
Log.info (fun f -> Log.info (fun f ->
f "Client %d (IP: %s) ready" domid (Ipaddr.V4.to_string client_ip)); f "Client %d (IP: %s) ready" domid (Ipaddr.V4.to_string client_ip));
ClientEth.connect backend >>= fun eth -> let* eth = ClientEth.connect backend in
let client_mac = Netback.frontend_mac backend in let client_mac = Netback.frontend_mac backend in
let client_eth = router.clients in let client_eth = router.clients in
let gateway_ip = Client_eth.client_gw client_eth in let gateway_ip = Client_eth.client_gw client_eth in
@ -404,46 +403,54 @@ struct
(function Lwt.Canceled -> Lwt.return_unit | e -> Lwt.fail e) (function Lwt.Canceled -> Lwt.return_unit | e -> Lwt.fail e)
in in
Cleanup.on_cleanup cleanup_tasks (fun () -> Lwt.cancel listener); Cleanup.on_cleanup cleanup_tasks (fun () -> Lwt.cancel listener);
Lwt.pick [ qubesdb_updater; listener ] (* XXX(dinosaure): [qubes_updater] and [listener] can be forgotten, our [cleanup_task]
will cancel them if the client is disconnected. *)
Lwt.async (fun () -> Lwt.pick [ qubesdb_updater; listener ]);
Lwt.return_unit
(** A new client VM has been found in XenStore. Find its interface and connect to it. *) (** A new client VM has been found in XenStore. Find its interface and connect to it. *)
let add_client get_ts dns_client dns_servers ~router vif client_ip qubesDB = let add_client get_ts dns_client dns_servers ~router vif client_ip qubesDB =
let open Lwt.Syntax in
let cleanup_tasks = Cleanup.create () in let cleanup_tasks = Cleanup.create () in
Log.info (fun f -> Log.info (fun f ->
f "add client vif %a with IP %a" Dao.ClientVif.pp vif Ipaddr.V4.pp f "add client vif %a with IP %a" Dao.ClientVif.pp vif Ipaddr.V4.pp
client_ip); client_ip);
Lwt.async (fun () -> let* () =
Lwt.catch Lwt.catch (add_vif get_ts vif dns_client dns_servers ~client_ip ~router
(fun () ->
add_vif get_ts vif dns_client dns_servers ~client_ip ~router
~cleanup_tasks qubesDB) ~cleanup_tasks qubesDB)
(fun ex -> @@ fun exn ->
Log.warn (fun f -> Log.warn (fun f ->
f "Error with client %a: %s" Dao.ClientVif.pp vif f "Error with client %a: %s" Dao.ClientVif.pp vif
(Printexc.to_string ex)); (Printexc.to_string exn));
Lwt.return_unit)); Lwt.return_unit
cleanup_tasks in
Lwt.return cleanup_tasks
(** Watch XenStore for notifications of new clients. *) (** Watch XenStore for notifications of new clients. *)
let wait_clients get_ts dns_client dns_servers qubesDB router = let wait_clients get_ts dns_client dns_servers qubesDB router =
Dao.watch_clients (fun new_set -> let open Lwt.Syntax in
let clients : Cleanup.t Dao.VifMap.t ref = ref Dao.VifMap.empty in
Dao.watch_clients @@ fun new_set ->
(* Check for removed clients *) (* Check for removed clients *)
!clients let clean_up_clients key cleanup =
|> Dao.VifMap.iter (fun key cleanup -> if not (Dao.VifMap.mem key new_set) then begin
if not (Dao.VifMap.mem key new_set) then (
clients := !clients |> Dao.VifMap.remove key; clients := !clients |> Dao.VifMap.remove key;
Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key); Log.info (fun f -> f "client %a has gone" Dao.ClientVif.pp key);
Cleanup.cleanup cleanup)); Cleanup.cleanup cleanup
(* Check for added clients *) end
new_set
|> Dao.VifMap.iter (fun key ip_addr ->
if not (Dao.VifMap.mem key !clients) then (
let cleanup =
add_client get_ts dns_client dns_servers ~router key ip_addr
qubesDB
in in
Dao.VifMap.iter clean_up_clients !clients;
(* Check for added clients *)
let rec go seq = match Seq.uncons seq with
| None -> Lwt.return_unit
| Some ((key, ipaddr), seq) when not (Dao.VifMap.mem key !clients) ->
let* cleanup = add_client get_ts dns_client dns_servers ~router key ipaddr qubesDB in
Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key); Log.debug (fun f -> f "client %a arrived" Dao.ClientVif.pp key);
clients := !clients |> Dao.VifMap.add key cleanup))) clients := Dao.VifMap.add key cleanup !clients;
go seq
| Some (_, seq) -> go seq
in
go (Dao.VifMap.to_seq new_set)
let send_dns_client_query t ~src_port ~dst ~dst_port buf = let send_dns_client_query t ~src_port ~dst ~dst_port buf =
match t.uplink with match t.uplink with