race condition i banish thee

This commit is contained in:
mithereal 2022-09-15 22:54:47 -07:00
parent b5378f85c6
commit 0e17984d6f
15 changed files with 498 additions and 186 deletions

2
.envrc
View File

@ -1,2 +1,2 @@
export FARSIDE_PORT="4001"
export FARSIDE_TIMEOUT="10000"
export FARSIDE_TIMEOUT="10000"

View File

@ -38,10 +38,10 @@
<h3>Last synced <%= DateTime.truncate(last_updated, :second) %> UTC</h2>
<div>
<ul>
<%= for {service, instance_list} <- services do %>
<li><a href="/<%= service %>"><%= service %></a></li>
<%= for service <- services do %>
<li><a href="<%= service.fallback %>"><%= service.type %></a></li>
<ul>
<%= for url <- instance_list do %>
<%= for url <- service.instances do %>
<li><a href="<%= url %>"><%= url %></a></li>
<% end%>
</ul>

View File

@ -35,29 +35,42 @@ defmodule Farside do
alias Farside.LastUpdated
def get_services_map do
Farside.Instance.Supervisor.list()
|> Enum.reduce(%{}, fn service, acc ->
{_, data} = :ets.lookup(String.to_atom(service), :data) |> List.first()
services_map =
Farside.Instance.Supervisor.list()
|> Enum.map(fn service ->
data = :ets.lookup(String.to_atom(service), :default) |> List.first()
instances =
case Enum.count(data.instances) == 0 do
true ->
[data.fallback]
instances =
case is_nil(data) do
true ->
[]
false ->
data.instances
end
false ->
{_, service} = data
Map.put(
acc,
String.replace_prefix(
service,
@service_prefix,
""
),
instances
)
end)
registry = "#{service.type}_healthy"
instances =
for instance <- service.instances do
matches = Registry.match(:status, registry, instance)
{_, instance} =
case Enum.count(matches) > 0 do
true -> List.first(matches)
false -> {:error, nil}
end
instance
end
|> Enum.reject(fn x -> x == nil end)
Map.put(
service,
:instances,
instances
)
end
end)
end
def get_service(service) do
@ -69,13 +82,47 @@ defmodule Farside do
end
)
data = :ets.lookup(String.to_atom(service_name), :data)
data = :ets.lookup(String.to_atom(service_name), :default)
{_, service} = List.first(data)
data =
case data do
nil -> []
_ -> data
end
case Enum.count(service.instances) > 0 do
true -> Enum.random(service.instances)
false -> service.fallback
case Enum.count(data) > 0 do
true ->
{_, service} = List.first(data)
registry = "#{service.type}_healthy"
instances =
for instance <- service.instances do
matches = Registry.match(:status, registry, instance)
{_, instance} =
case Enum.count(matches) > 0 do
true -> List.first(matches)
false -> {:error, nil}
end
instance
end
|> Enum.reject(fn x -> x == nil end)
service = Map.put(
service,
:instances,
instances
)
case Enum.count(service.instances) > 0 do
true -> Enum.random(service.instances)
false -> service.fallback
end
false ->
"/"
end
end

View File

@ -6,10 +6,9 @@ defmodule Farside.Application do
require Logger
alias Farside.LastUpdated
alias Farside.Status
alias Farside.Instance.Check
alias Farside.Instance.Sync
alias Farside.Http
alias Farside.Server.HealthyCheck
alias Farside.Server.UnHealthyCheck
alias Farside.Server.DeadCheck
@impl true
def start(_type, _args) do
@ -24,7 +23,7 @@ defmodule Farside.Application do
maybe_loaded_children =
case is_nil(System.get_env("FARSIDE_TEST")) do
true ->
[{Check, []},{Sync, []}]
[{HealthyCheck, []},{UnHealthyCheck, []},{DeadCheck, []}]
false ->
Logger.info("Skipping sync job setup...")
@ -41,10 +40,12 @@ defmodule Farside.Application do
]
),
{LastUpdated, DateTime.utc_now()},
{Status, :init},
{PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000},
{DynamicSupervisor, strategy: :one_for_one, name: :server_supervisor},
{Registry, keys: :unique, name: :servers}
{DynamicSupervisor, strategy: :one_for_one, name: :instance_supervisor},
{DynamicSupervisor, strategy: :one_for_one, name: :service_supervisor},
{Registry, keys: :unique, name: :instance},
{Registry, keys: :unique, name: :service},
{Registry, keys: :duplicate, name: :status, partitions: System.schedulers_online()},
] ++ maybe_loaded_children
opts = [strategy: :one_for_one, name: Farside.Supervisor]
@ -55,7 +56,6 @@ defmodule Farside.Application do
def load(response) do
services_json_data = Application.fetch_env!(:farside, :services_json_data)
queries = Application.fetch_env!(:farside, :queries)
reply =
case String.length(services_json_data) < 10 do
@ -78,7 +78,7 @@ defmodule Farside.Application do
struct(%Service{}, service_atom)
|> Farside.Instance.Supervisor.start()
|> Farside.Instances.sync()
|> HealthyCheck.load()
end
LastUpdated.value(DateTime.utc_now())

View File

@ -1,11 +1,9 @@
defmodule Farside.Instance.Check do
defmodule Farside.Server.DeadCheck do
@moduledoc """
Module to check/validate the instance list only for servers with empty instance list every 90 secs, if a sync/check process isnt already running
"""
use Task
alias Farside.Status
def child_spec(args) do
%{
id: __MODULE__,
@ -21,16 +19,15 @@ defmodule Farside.Instance.Check do
def poll() do
receive do
after
90_000 ->
if(Status.value() == :wait) do
run()
end
1_200_000 ->
run()
poll()
end
end
defp run() do
Farside.Instance.Supervisor.sync_empty_instances()
def run() do
Registry.dispatch(:status, "dead", fn entries ->
for {pid, _} <- entries, do: GenServer.cast(pid, :check)
end)
end
end

View File

@ -0,0 +1,44 @@
defmodule Farside.Server.HealthyCheck do
@moduledoc """
Module to validate healthy servers
"""
use Task
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker
}
end
def start_link(_arg) do
Task.start_link(&poll/0)
end
def poll() do
receive do
after
90_000 ->
run()
poll()
end
end
def load(params) do
Registry.dispatch(:status, "healthy", fn entries ->
for {pid, url} <- entries do
GenServer.cast(pid, :check)
end
end)
params
end
def run() do
Registry.dispatch(:status, "healthy", fn entries ->
for {pid, url} <- entries do
GenServer.cast(pid, :check)
end
end)
end
end

View File

@ -27,6 +27,7 @@ defmodule Farside.Http do
end
def request(url, type) do
cond do
System.get_env("FARSIDE_TEST") ->
:good
@ -79,4 +80,35 @@ defmodule Farside.Http do
%{service | instances: instances}
end
def test_service(service) do
url = service.url <> service.test_url
test_url =
EEx.eval_string(
url,
query: Enum.random(@queries)
)
task =
Task.async(fn ->
reply = request(test_url, service.type)
{test_url, reply, service}
end)
data =
case Task.yield(task, @recv_timeout) || Task.shutdown(task) do
{:ok, result} ->
result
nil ->
nil
end
unless is_nil(data) do
{_test_url, value, _service} = data
value
else
:bad
end
end
end

View File

@ -4,10 +4,8 @@ defmodule Farside.Instance do
require Logger
alias Farside.Http
alias Farside.Status
@registry_name :servers
@update_file Application.fetch_env!(:farside, :update_file) <> ".json"
@registry_name :instance
def child_spec(args) do
%{
@ -34,6 +32,7 @@ defmodule Farside.Instance do
def start_link(arg) do
name = via_tuple(arg.type)
GenServer.start_link(__MODULE__, arg, name: name)
end
@ -56,53 +55,10 @@ defmodule Farside.Instance do
{:stop, :normal, state}
end
def handle_cast(
:update,
state
) do
Status.value(:test)
service = :ets.lookup(String.to_atom(state.type), :default)
{_, service} = List.first(service)
service = Http.fetch_instances(service)
:ets.delete(String.to_atom(state.type), :data)
:ets.insert(state.ref, {:data, service})
encoded = service |> Map.from_struct() |> Jason.encode!()
Farside.save_results(@update_file, encoded)
Status.value(:wait)
{:noreply, state}
end
def handle_cast(
:check,
state
) do
service = :ets.lookup(String.to_atom(state.type), :default)
{_, service} = List.first(service)
if Enum.count(service.instances) == 0 do
service = Http.fetch_instances(service)
:ets.delete(String.to_atom(state.type), :data)
:ets.insert(state.ref, {:data, service})
end
{:noreply, state}
end
@doc false
def via_tuple(data, registry \\ @registry_name) do
{:via, Registry, {registry, data}}
def via_tuple(id, registry \\ @registry_name) do
{:via, Registry, {registry, id}}
end
@impl true
@ -111,8 +67,18 @@ defmodule Farside.Instance do
{:noreply, {names, refs}}
end
@impl true
def handle_info(_msg, state) do
def handle_info(:load, state) do
service = :ets.lookup(String.to_atom(state.type), :default)
{_, service} = List.first(service)
service.instances
|> Enum.each(fn url ->
initial_state = %{url: url, type: service.type, test_url: service.test_url}
Farside.Service.Supervisor.start(initial_state)
end)
{:noreply, state}
end
end

View File

@ -4,8 +4,8 @@ defmodule Farside.Instance.Supervisor do
alias __MODULE__, as: SUPERVISOR
alias Farside.Instance, as: SERVER
@name :server_supervisor
@registry_name :servers
@name :instance_supervisor
@registry_name :instance
def child_spec() do
%{
@ -31,7 +31,11 @@ defmodule Farside.Instance.Supervisor do
def start(opts \\ %{}) do
child_spec = {SERVER, opts}
DynamicSupervisor.start_child(@name, child_spec)
{:ok,pid} = DynamicSupervisor.start_child(@name, child_spec)
send(pid, :load)
{:ok,pid}
end
def stop(id) do
@ -45,14 +49,6 @@ defmodule Farside.Instance.Supervisor do
end
end
def update_children() do
list()
|> Enum.each(fn x ->
SERVER.via_tuple(x)
|> GenServer.cast(:update)
end)
end
def whereis(id) do
case Registry.lookup(@registry_name, id) do
[{pid, _}] -> pid
@ -90,12 +86,4 @@ defmodule Farside.Instance.Supervisor do
_ -> true
end
end
def sync_empty_instances() do
list()
|> Enum.each(fn x ->
SERVER.via_tuple(x)
|> GenServer.cast(:check)
end)
end
end

View File

@ -1,18 +0,0 @@
defmodule Farside.Instances do
alias Farside.LastUpdated
def sync(data \\ []) do
update_file = Application.fetch_env!(:farside, :update_file)
update_json = update_file <> ".json"
if System.get_env("MIX_ENV") == "dev" do
File.rename(update_json, "#{update_file}-#{to_string(DateTime.utc_now()) <> ".json"}")
File.write(update_json, "")
end
LastUpdated.value(DateTime.utc_now())
Farside.Instance.Supervisor.update_children()
end
end

136
lib/farside/service.ex Normal file
View File

@ -0,0 +1,136 @@
defmodule Farside.Service do
use GenServer
require Logger
alias Farside.Http
@registry_name :service
defstruct url: nil,
type: nil,
test_url: nil,
last_update: nil,
status: []
def child_spec(data) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [data]},
type: :worker
}
end
def init(data) do
initial_state = %__MODULE__{
url: data.url,
type: data.type,
test_url: data.test_url,
last_update: nil,
status: []
}
healthy = "#{data.type}_healthy"
Registry.register(:status, healthy, data.url)
Registry.register(:status, "healthy", data.url)
{:ok, initial_state}
end
def start_link(data) do
name = via_tuple(data.url)
GenServer.start_link(__MODULE__, data, name: name)
end
def shutdown() do
GenServer.call(__MODULE__, :shutdown)
end
def handle_call(
:shutdown,
_from,
state
) do
{:stop, {:ok, "Normal Shutdown"}, state}
end
def handle_cast(
:shutdown,
state
) do
{:stop, :normal, state}
end
@doc false
def via_tuple(id, registry \\ @registry_name) do
{:via, Registry, {registry, id}}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, data) do
{:noreply, data}
end
@impl true
def handle_info(_msg, state) do
{:noreply, state}
end
@impl true
def handle_cast(:load, state) do
reply = Http.test_service(state)
status = state.status ++ [reply]
state = %{state | status: status}
state = %{state | last_update: DateTime.utc_now()}
{:noreply, state}
end
@impl true
def handle_cast(:check, state) do
reply = Http.test_service(state)
status = state.status ++ [reply]
state = %{state | status: status}
state = %{state | last_update: DateTime.utc_now()}
healthy = "#{state.type}_healthy"
unhealthy = "#{state.type}_unhealthy"
dead = "#{state.type}_dead"
Registry.unregister_match(:status, "healthy", state.url)
Registry.unregister_match(:status, "unhealthy", state.url)
Registry.unregister_match(:status, "dead", state.url)
Registry.unregister_match(:status, healthy, state.url)
Registry.unregister_match(:status, unhealthy, state.url)
Registry.unregister_match(:status, dead, state.url)
if reply != :good do
filtered = Enum.reject(status, fn x -> x == :good end)
fails_before_death = Application.get_env(:farside, :max_fail_rate, 50)
case Enum.count(filtered) < fails_before_death do
true ->
Registry.register(:status, "unhealthy", state.url)
Registry.register(:status, unhealthy, state.url)
false ->
Registry.register(:status, "dead", state.url)
Registry.register(:status, dead, state.url)
end
else
Registry.register(:status, "healthy", state.url)
Registry.register(:status, healthy, state.url)
end
{:noreply, state}
end
end

View File

@ -0,0 +1,138 @@
defmodule Farside.Service.Supervisor do
use DynamicSupervisor
alias __MODULE__, as: SUPERVISOR
alias Farside.Service, as: SERVER
@name :service_supervisor
@registry_name :service
def child_spec() do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :supervisor
}
end
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: @name)
end
def start_link() do
DynamicSupervisor.start_link(__MODULE__, [], name: @name)
end
@impl true
def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start(data) do
child_spec = {SERVER, data}
{status, pid} = DynamicSupervisor.start_child(@name, child_spec)
if status == :ok do
send(pid, :load)
end
end
def stop(id) do
case Registry.lookup(@registry_name, id) do
[] ->
:ok
[{pid, _}] ->
Process.exit(pid, :shutdown)
:ok
end
end
def stop(id, registry_name) do
case Registry.lookup(registry_name, id) do
[] ->
:ok
[{pid, _}] ->
Process.exit(pid, :shutdown)
:ok
end
end
def update_children() do
list()
|> Enum.each(fn x ->
SERVER.via_tuple(x)
|> GenServer.cast(:update)
end)
end
def whereis(id) do
case Registry.lookup(@registry_name, id) do
[{pid, _}] -> pid
[] -> nil
end
end
def whereis(id, registry_name) do
case Registry.lookup(registry_name, id) do
[{pid, _}] -> pid
[] -> nil
end
end
def find_or_create(id) do
if process_exists?(id) do
{:ok, id}
else
id |> start
end
end
def exists?(id) do
case Registry.lookup(@registry_name, id) do
[] -> false
_ -> true
end
end
def exists?(id, registry_name) do
case Registry.lookup(registry_name, id) do
[] -> false
_ -> true
end
end
def list do
DynamicSupervisor.which_children(@name)
|> Enum.map(fn {_, account_proc_pid, _, _} ->
Registry.keys(@registry_name, account_proc_pid)
|> List.first()
end)
|> Enum.sort()
end
def list(registry_name) do
DynamicSupervisor.which_children(@name)
|> Enum.map(fn {_, account_proc_pid, _, _} ->
Registry.keys(registry_name, account_proc_pid)
|> List.first()
end)
|> Enum.sort()
end
def process_exists?(hash, registry_name) do
case Registry.lookup(registry_name, hash) do
[] -> false
_ -> true
end
end
def process_exists?(hash) do
case Registry.lookup(@registry_name, hash) do
[] -> false
_ -> true
end
end
end

View File

@ -1,15 +0,0 @@
defmodule Farside.Status do
use Agent
def start_link(initial_value) do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end
def value do
Agent.get(__MODULE__, & &1)
end
def value(new_value) do
Agent.update(__MODULE__, fn _ -> new_value end)
end
end

View File

@ -1,36 +0,0 @@
defmodule Farside.Instance.Sync do
@moduledoc """
Module to sync (check/validate) the instance list every 300 secs, if a sync/check process isnt already running
"""
use Task
alias Farside.Status
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker
}
end
def start_link(_arg) do
Task.start_link(&poll/0)
end
def poll() do
receive do
after
300_000 ->
if(Status.value() == :wait) do
sync()
end
poll()
end
end
defp sync() do
Farside.Instances.sync()
end
end

View File

@ -0,0 +1,33 @@
defmodule Farside.Server.UnHealthyCheck do
@moduledoc """
Module to check/validate the instance list only for servers with empty instance list every 90 secs, if a sync/check process isnt already running
"""
use Task
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker
}
end
def start_link(_arg) do
Task.start_link(&poll/0)
end
def poll() do
receive do
after
120_000 ->
run()
poll()
end
end
def run() do
Registry.dispatch(:status, "unhealthy", fn entries ->
for {pid, _} <- entries, do: GenServer.cast(pid, :check)
end)
end
end