diff --git a/.envrc b/.envrc
index 624af5a..85e78a8 100644
--- a/.envrc
+++ b/.envrc
@@ -1,2 +1,2 @@
export FARSIDE_PORT="4001"
-export FARSIDE_TIMEOUT="10000"
\ No newline at end of file
+export FARSIDE_TIMEOUT="10000"
diff --git a/index.eex b/index.eex
index b84636d..e0f2a58 100644
--- a/index.eex
+++ b/index.eex
@@ -38,10 +38,10 @@
Last synced <%= DateTime.truncate(last_updated, :second) %> UTC
- <%= for {service, instance_list} <- services do %>
- - <%= service %>
+ <%= for service <- services do %>
+ - <%= service.type %>
- <%= for url <- instance_list do %>
+ <%= for url <- service.instances do %>
- <%= url %>
<% end%>
diff --git a/lib/farside.ex b/lib/farside.ex
index a14690c..2b96851 100644
--- a/lib/farside.ex
+++ b/lib/farside.ex
@@ -35,29 +35,42 @@ defmodule Farside do
alias Farside.LastUpdated
def get_services_map do
- Farside.Instance.Supervisor.list()
- |> Enum.reduce(%{}, fn service, acc ->
- {_, data} = :ets.lookup(String.to_atom(service), :data) |> List.first()
+ services_map =
+ Farside.Instance.Supervisor.list()
+ |> Enum.map(fn service ->
+ data = :ets.lookup(String.to_atom(service), :default) |> List.first()
- instances =
- case Enum.count(data.instances) == 0 do
- true ->
- [data.fallback]
+ instances =
+ case is_nil(data) do
+ true ->
+ []
- false ->
- data.instances
- end
+ false ->
+ {_, service} = data
- Map.put(
- acc,
- String.replace_prefix(
- service,
- @service_prefix,
- ""
- ),
- instances
- )
- end)
+ registry = "#{service.type}_healthy"
+
+ instances =
+ for instance <- service.instances do
+ matches = Registry.match(:status, registry, instance)
+
+ {_, instance} =
+ case Enum.count(matches) > 0 do
+ true -> List.first(matches)
+ false -> {:error, nil}
+ end
+
+ instance
+ end
+ |> Enum.reject(fn x -> x == nil end)
+
+ Map.put(
+ service,
+ :instances,
+ instances
+ )
+ end
+ end)
end
def get_service(service) do
@@ -69,13 +82,47 @@ defmodule Farside do
end
)
- data = :ets.lookup(String.to_atom(service_name), :data)
+ data = :ets.lookup(String.to_atom(service_name), :default)
- {_, service} = List.first(data)
+ data =
+ case data do
+ nil -> []
+ _ -> data
+ end
- case Enum.count(service.instances) > 0 do
- true -> Enum.random(service.instances)
- false -> service.fallback
+ case Enum.count(data) > 0 do
+ true ->
+ {_, service} = List.first(data)
+
+ registry = "#{service.type}_healthy"
+
+ instances =
+ for instance <- service.instances do
+ matches = Registry.match(:status, registry, instance)
+
+ {_, instance} =
+ case Enum.count(matches) > 0 do
+ true -> List.first(matches)
+ false -> {:error, nil}
+ end
+
+ instance
+ end
+ |> Enum.reject(fn x -> x == nil end)
+
+ service = Map.put(
+ service,
+ :instances,
+ instances
+ )
+
+ case Enum.count(service.instances) > 0 do
+ true -> Enum.random(service.instances)
+ false -> service.fallback
+ end
+
+ false ->
+ "/"
end
end
diff --git a/lib/farside/application.ex b/lib/farside/application.ex
index 5060f58..e6218d6 100644
--- a/lib/farside/application.ex
+++ b/lib/farside/application.ex
@@ -6,10 +6,9 @@ defmodule Farside.Application do
require Logger
alias Farside.LastUpdated
- alias Farside.Status
- alias Farside.Instance.Check
- alias Farside.Instance.Sync
- alias Farside.Http
+ alias Farside.Server.HealthyCheck
+ alias Farside.Server.UnHealthyCheck
+ alias Farside.Server.DeadCheck
@impl true
def start(_type, _args) do
@@ -24,7 +23,7 @@ defmodule Farside.Application do
maybe_loaded_children =
case is_nil(System.get_env("FARSIDE_TEST")) do
true ->
- [{Check, []},{Sync, []}]
+ [{HealthyCheck, []},{UnHealthyCheck, []},{DeadCheck, []}]
false ->
Logger.info("Skipping sync job setup...")
@@ -41,10 +40,12 @@ defmodule Farside.Application do
]
),
{LastUpdated, DateTime.utc_now()},
- {Status, :init},
{PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000},
- {DynamicSupervisor, strategy: :one_for_one, name: :server_supervisor},
- {Registry, keys: :unique, name: :servers}
+ {DynamicSupervisor, strategy: :one_for_one, name: :instance_supervisor},
+ {DynamicSupervisor, strategy: :one_for_one, name: :service_supervisor},
+ {Registry, keys: :unique, name: :instance},
+ {Registry, keys: :unique, name: :service},
+ {Registry, keys: :duplicate, name: :status, partitions: System.schedulers_online()},
] ++ maybe_loaded_children
opts = [strategy: :one_for_one, name: Farside.Supervisor]
@@ -55,7 +56,6 @@ defmodule Farside.Application do
def load(response) do
services_json_data = Application.fetch_env!(:farside, :services_json_data)
- queries = Application.fetch_env!(:farside, :queries)
reply =
case String.length(services_json_data) < 10 do
@@ -78,7 +78,7 @@ defmodule Farside.Application do
struct(%Service{}, service_atom)
|> Farside.Instance.Supervisor.start()
- |> Farside.Instances.sync()
+ |> HealthyCheck.load()
end
LastUpdated.value(DateTime.utc_now())
diff --git a/lib/farside/check.ex b/lib/farside/deadcheck.ex
similarity index 68%
rename from lib/farside/check.ex
rename to lib/farside/deadcheck.ex
index 4d3e507..c36a4f9 100644
--- a/lib/farside/check.ex
+++ b/lib/farside/deadcheck.ex
@@ -1,11 +1,9 @@
-defmodule Farside.Instance.Check do
+defmodule Farside.Server.DeadCheck do
@moduledoc """
Module to check/validate the instance list only for servers with empty instance list every 90 secs, if a sync/check process isnt already running
"""
use Task
- alias Farside.Status
-
def child_spec(args) do
%{
id: __MODULE__,
@@ -21,16 +19,15 @@ defmodule Farside.Instance.Check do
def poll() do
receive do
after
- 90_000 ->
- if(Status.value() == :wait) do
- run()
- end
-
+ 1_200_000 ->
+ run()
poll()
end
end
- defp run() do
- Farside.Instance.Supervisor.sync_empty_instances()
+ def run() do
+ Registry.dispatch(:status, "dead", fn entries ->
+ for {pid, _} <- entries, do: GenServer.cast(pid, :check)
+ end)
end
end
diff --git a/lib/farside/healthycheck.ex b/lib/farside/healthycheck.ex
new file mode 100644
index 0000000..d49672b
--- /dev/null
+++ b/lib/farside/healthycheck.ex
@@ -0,0 +1,44 @@
+defmodule Farside.Server.HealthyCheck do
+ @moduledoc """
+ Module to validate healthy servers
+ """
+ use Task
+
+ def child_spec(args) do
+ %{
+ id: __MODULE__,
+ start: {__MODULE__, :start_link, [args]},
+ type: :worker
+ }
+ end
+
+ def start_link(_arg) do
+ Task.start_link(&poll/0)
+ end
+
+ def poll() do
+ receive do
+ after
+ 90_000 ->
+ run()
+ poll()
+ end
+ end
+
+ def load(params) do
+ Registry.dispatch(:status, "healthy", fn entries ->
+ for {pid, url} <- entries do
+ GenServer.cast(pid, :check)
+ end
+ end)
+ params
+ end
+
+ def run() do
+ Registry.dispatch(:status, "healthy", fn entries ->
+ for {pid, url} <- entries do
+ GenServer.cast(pid, :check)
+ end
+ end)
+ end
+end
diff --git a/lib/farside/http.ex b/lib/farside/http.ex
index e4863e6..d3fc30f 100644
--- a/lib/farside/http.ex
+++ b/lib/farside/http.ex
@@ -27,6 +27,7 @@ defmodule Farside.Http do
end
def request(url, type) do
+
cond do
System.get_env("FARSIDE_TEST") ->
:good
@@ -79,4 +80,35 @@ defmodule Farside.Http do
%{service | instances: instances}
end
+
+ def test_service(service) do
+url = service.url <> service.test_url
+ test_url =
+ EEx.eval_string(
+ url,
+ query: Enum.random(@queries)
+ )
+
+ task =
+ Task.async(fn ->
+ reply = request(test_url, service.type)
+ {test_url, reply, service}
+ end)
+
+ data =
+ case Task.yield(task, @recv_timeout) || Task.shutdown(task) do
+ {:ok, result} ->
+ result
+
+ nil ->
+ nil
+ end
+
+ unless is_nil(data) do
+ {_test_url, value, _service} = data
+ value
+ else
+ :bad
+ end
+ end
end
diff --git a/lib/farside/instance.ex b/lib/farside/instance.ex
index bf44925..42f584e 100644
--- a/lib/farside/instance.ex
+++ b/lib/farside/instance.ex
@@ -4,10 +4,8 @@ defmodule Farside.Instance do
require Logger
alias Farside.Http
- alias Farside.Status
- @registry_name :servers
- @update_file Application.fetch_env!(:farside, :update_file) <> ".json"
+ @registry_name :instance
def child_spec(args) do
%{
@@ -34,6 +32,7 @@ defmodule Farside.Instance do
def start_link(arg) do
name = via_tuple(arg.type)
+
GenServer.start_link(__MODULE__, arg, name: name)
end
@@ -56,53 +55,10 @@ defmodule Farside.Instance do
{:stop, :normal, state}
end
- def handle_cast(
- :update,
- state
- ) do
- Status.value(:test)
-
- service = :ets.lookup(String.to_atom(state.type), :default)
-
- {_, service} = List.first(service)
-
- service = Http.fetch_instances(service)
-
- :ets.delete(String.to_atom(state.type), :data)
-
- :ets.insert(state.ref, {:data, service})
-
- encoded = service |> Map.from_struct() |> Jason.encode!()
-
- Farside.save_results(@update_file, encoded)
-
- Status.value(:wait)
-
- {:noreply, state}
- end
-
- def handle_cast(
- :check,
- state
- ) do
- service = :ets.lookup(String.to_atom(state.type), :default)
-
- {_, service} = List.first(service)
-
- if Enum.count(service.instances) == 0 do
- service = Http.fetch_instances(service)
-
- :ets.delete(String.to_atom(state.type), :data)
-
- :ets.insert(state.ref, {:data, service})
- end
-
- {:noreply, state}
- end
@doc false
- def via_tuple(data, registry \\ @registry_name) do
- {:via, Registry, {registry, data}}
+ def via_tuple(id, registry \\ @registry_name) do
+ {:via, Registry, {registry, id}}
end
@impl true
@@ -111,8 +67,18 @@ defmodule Farside.Instance do
{:noreply, {names, refs}}
end
+
@impl true
- def handle_info(_msg, state) do
+ def handle_info(:load, state) do
+ service = :ets.lookup(String.to_atom(state.type), :default)
+
+ {_, service} = List.first(service)
+ service.instances
+ |> Enum.each(fn url ->
+ initial_state = %{url: url, type: service.type, test_url: service.test_url}
+ Farside.Service.Supervisor.start(initial_state)
+ end)
+
{:noreply, state}
end
end
diff --git a/lib/farside/instance.supervisor.ex b/lib/farside/instance.supervisor.ex
index 4f66242..5147ef6 100644
--- a/lib/farside/instance.supervisor.ex
+++ b/lib/farside/instance.supervisor.ex
@@ -4,8 +4,8 @@ defmodule Farside.Instance.Supervisor do
alias __MODULE__, as: SUPERVISOR
alias Farside.Instance, as: SERVER
- @name :server_supervisor
- @registry_name :servers
+ @name :instance_supervisor
+ @registry_name :instance
def child_spec() do
%{
@@ -31,7 +31,11 @@ defmodule Farside.Instance.Supervisor do
def start(opts \\ %{}) do
child_spec = {SERVER, opts}
- DynamicSupervisor.start_child(@name, child_spec)
+ {:ok,pid} = DynamicSupervisor.start_child(@name, child_spec)
+
+ send(pid, :load)
+
+ {:ok,pid}
end
def stop(id) do
@@ -45,14 +49,6 @@ defmodule Farside.Instance.Supervisor do
end
end
- def update_children() do
- list()
- |> Enum.each(fn x ->
- SERVER.via_tuple(x)
- |> GenServer.cast(:update)
- end)
- end
-
def whereis(id) do
case Registry.lookup(@registry_name, id) do
[{pid, _}] -> pid
@@ -90,12 +86,4 @@ defmodule Farside.Instance.Supervisor do
_ -> true
end
end
-
- def sync_empty_instances() do
- list()
- |> Enum.each(fn x ->
- SERVER.via_tuple(x)
- |> GenServer.cast(:check)
- end)
- end
end
diff --git a/lib/farside/instances.ex b/lib/farside/instances.ex
deleted file mode 100644
index caceb25..0000000
--- a/lib/farside/instances.ex
+++ /dev/null
@@ -1,18 +0,0 @@
-defmodule Farside.Instances do
- alias Farside.LastUpdated
-
- def sync(data \\ []) do
- update_file = Application.fetch_env!(:farside, :update_file)
- update_json = update_file <> ".json"
-
- if System.get_env("MIX_ENV") == "dev" do
- File.rename(update_json, "#{update_file}-#{to_string(DateTime.utc_now()) <> ".json"}")
-
- File.write(update_json, "")
- end
-
- LastUpdated.value(DateTime.utc_now())
-
- Farside.Instance.Supervisor.update_children()
- end
-end
diff --git a/lib/farside/service.ex b/lib/farside/service.ex
new file mode 100644
index 0000000..7c8897b
--- /dev/null
+++ b/lib/farside/service.ex
@@ -0,0 +1,136 @@
+defmodule Farside.Service do
+ use GenServer
+
+ require Logger
+
+ alias Farside.Http
+
+ @registry_name :service
+
+ defstruct url: nil,
+ type: nil,
+ test_url: nil,
+ last_update: nil,
+ status: []
+
+ def child_spec(data) do
+ %{
+ id: __MODULE__,
+ start: {__MODULE__, :start_link, [data]},
+ type: :worker
+ }
+ end
+
+ def init(data) do
+ initial_state = %__MODULE__{
+ url: data.url,
+ type: data.type,
+ test_url: data.test_url,
+ last_update: nil,
+ status: []
+ }
+
+ healthy = "#{data.type}_healthy"
+
+ Registry.register(:status, healthy, data.url)
+ Registry.register(:status, "healthy", data.url)
+ {:ok, initial_state}
+ end
+
+ def start_link(data) do
+ name = via_tuple(data.url)
+ GenServer.start_link(__MODULE__, data, name: name)
+ end
+
+ def shutdown() do
+ GenServer.call(__MODULE__, :shutdown)
+ end
+
+ def handle_call(
+ :shutdown,
+ _from,
+ state
+ ) do
+ {:stop, {:ok, "Normal Shutdown"}, state}
+ end
+
+ def handle_cast(
+ :shutdown,
+ state
+ ) do
+ {:stop, :normal, state}
+ end
+
+ @doc false
+ def via_tuple(id, registry \\ @registry_name) do
+ {:via, Registry, {registry, id}}
+ end
+
+ @impl true
+ def handle_info({:DOWN, ref, :process, _pid, _reason}, data) do
+ {:noreply, data}
+ end
+
+ @impl true
+ def handle_info(_msg, state) do
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_cast(:load, state) do
+ reply = Http.test_service(state)
+
+ status = state.status ++ [reply]
+
+ state = %{state | status: status}
+
+ state = %{state | last_update: DateTime.utc_now()}
+
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_cast(:check, state) do
+
+ reply = Http.test_service(state)
+
+ status = state.status ++ [reply]
+
+ state = %{state | status: status}
+
+ state = %{state | last_update: DateTime.utc_now()}
+
+ healthy = "#{state.type}_healthy"
+ unhealthy = "#{state.type}_unhealthy"
+ dead = "#{state.type}_dead"
+
+ Registry.unregister_match(:status, "healthy", state.url)
+ Registry.unregister_match(:status, "unhealthy", state.url)
+ Registry.unregister_match(:status, "dead", state.url)
+
+ Registry.unregister_match(:status, healthy, state.url)
+ Registry.unregister_match(:status, unhealthy, state.url)
+ Registry.unregister_match(:status, dead, state.url)
+
+ if reply != :good do
+ filtered = Enum.reject(status, fn x -> x == :good end)
+
+ fails_before_death = Application.get_env(:farside, :max_fail_rate, 50)
+
+ case Enum.count(filtered) < fails_before_death do
+ true ->
+ Registry.register(:status, "unhealthy", state.url)
+ Registry.register(:status, unhealthy, state.url)
+
+ false ->
+ Registry.register(:status, "dead", state.url)
+ Registry.register(:status, dead, state.url)
+ end
+ else
+ Registry.register(:status, "healthy", state.url)
+ Registry.register(:status, healthy, state.url)
+ end
+
+ {:noreply, state}
+ end
+end
diff --git a/lib/farside/service.supervisor.ex b/lib/farside/service.supervisor.ex
new file mode 100644
index 0000000..85369e0
--- /dev/null
+++ b/lib/farside/service.supervisor.ex
@@ -0,0 +1,138 @@
+defmodule Farside.Service.Supervisor do
+ use DynamicSupervisor
+
+ alias __MODULE__, as: SUPERVISOR
+ alias Farside.Service, as: SERVER
+
+ @name :service_supervisor
+ @registry_name :service
+
+ def child_spec() do
+ %{
+ id: __MODULE__,
+ start: {__MODULE__, :start_link, []},
+ type: :supervisor
+ }
+ end
+
+ def start_link(init_arg) do
+ DynamicSupervisor.start_link(__MODULE__, init_arg, name: @name)
+ end
+
+ def start_link() do
+ DynamicSupervisor.start_link(__MODULE__, [], name: @name)
+ end
+
+ @impl true
+ def init(_) do
+ DynamicSupervisor.init(strategy: :one_for_one)
+ end
+
+ def start(data) do
+ child_spec = {SERVER, data}
+
+ {status, pid} = DynamicSupervisor.start_child(@name, child_spec)
+
+ if status == :ok do
+ send(pid, :load)
+ end
+ end
+
+ def stop(id) do
+ case Registry.lookup(@registry_name, id) do
+ [] ->
+ :ok
+
+ [{pid, _}] ->
+ Process.exit(pid, :shutdown)
+ :ok
+ end
+ end
+
+ def stop(id, registry_name) do
+ case Registry.lookup(registry_name, id) do
+ [] ->
+ :ok
+
+ [{pid, _}] ->
+ Process.exit(pid, :shutdown)
+ :ok
+ end
+ end
+
+ def update_children() do
+ list()
+ |> Enum.each(fn x ->
+ SERVER.via_tuple(x)
+ |> GenServer.cast(:update)
+ end)
+ end
+
+ def whereis(id) do
+ case Registry.lookup(@registry_name, id) do
+ [{pid, _}] -> pid
+ [] -> nil
+ end
+ end
+
+ def whereis(id, registry_name) do
+ case Registry.lookup(registry_name, id) do
+ [{pid, _}] -> pid
+ [] -> nil
+ end
+ end
+
+ def find_or_create(id) do
+ if process_exists?(id) do
+ {:ok, id}
+ else
+ id |> start
+ end
+ end
+
+ def exists?(id) do
+ case Registry.lookup(@registry_name, id) do
+ [] -> false
+ _ -> true
+ end
+ end
+
+ def exists?(id, registry_name) do
+ case Registry.lookup(registry_name, id) do
+ [] -> false
+ _ -> true
+ end
+ end
+
+ def list do
+ DynamicSupervisor.which_children(@name)
+ |> Enum.map(fn {_, account_proc_pid, _, _} ->
+ Registry.keys(@registry_name, account_proc_pid)
+ |> List.first()
+ end)
+ |> Enum.sort()
+ end
+
+ def list(registry_name) do
+ DynamicSupervisor.which_children(@name)
+ |> Enum.map(fn {_, account_proc_pid, _, _} ->
+ Registry.keys(registry_name, account_proc_pid)
+ |> List.first()
+ end)
+ |> Enum.sort()
+ end
+
+ def process_exists?(hash, registry_name) do
+ case Registry.lookup(registry_name, hash) do
+ [] -> false
+ _ -> true
+ end
+ end
+
+ def process_exists?(hash) do
+ case Registry.lookup(@registry_name, hash) do
+ [] -> false
+ _ -> true
+ end
+ end
+end
diff --git a/lib/farside/status.ex b/lib/farside/status.ex
deleted file mode 100644
index 02053f0..0000000
--- a/lib/farside/status.ex
+++ /dev/null
@@ -1,15 +0,0 @@
-defmodule Farside.Status do
- use Agent
-
- def start_link(initial_value) do
- Agent.start_link(fn -> initial_value end, name: __MODULE__)
- end
-
- def value do
- Agent.get(__MODULE__, & &1)
- end
-
- def value(new_value) do
- Agent.update(__MODULE__, fn _ -> new_value end)
- end
-end
diff --git a/lib/farside/sync.ex b/lib/farside/sync.ex
deleted file mode 100644
index ca8a3a4..0000000
--- a/lib/farside/sync.ex
+++ /dev/null
@@ -1,36 +0,0 @@
-defmodule Farside.Instance.Sync do
- @moduledoc """
- Module to sync (check/validate) the instance list every 300 secs, if a sync/check process isnt already running
- """
- use Task
-
- alias Farside.Status
-
- def child_spec(args) do
- %{
- id: __MODULE__,
- start: {__MODULE__, :start_link, [args]},
- type: :worker
- }
- end
-
- def start_link(_arg) do
- Task.start_link(&poll/0)
- end
-
- def poll() do
- receive do
- after
- 300_000 ->
- if(Status.value() == :wait) do
- sync()
- end
-
- poll()
- end
- end
-
- defp sync() do
- Farside.Instances.sync()
- end
-end
diff --git a/lib/farside/unhealthycheck.ex b/lib/farside/unhealthycheck.ex
new file mode 100644
index 0000000..e3e7b77
--- /dev/null
+++ b/lib/farside/unhealthycheck.ex
@@ -0,0 +1,33 @@
+defmodule Farside.Server.UnHealthyCheck do
+ @moduledoc """
+ Module to check/validate the instance list only for servers with empty instance list every 90 secs, if a sync/check process isnt already running
+ """
+ use Task
+
+ def child_spec(args) do
+ %{
+ id: __MODULE__,
+ start: {__MODULE__, :start_link, [args]},
+ type: :worker
+ }
+ end
+
+ def start_link(_arg) do
+ Task.start_link(&poll/0)
+ end
+
+ def poll() do
+ receive do
+ after
+ 120_000 ->
+ run()
+ poll()
+ end
+ end
+
+ def run() do
+ Registry.dispatch(:status, "unhealthy", fn entries ->
+ for {pid, _} <- entries, do: GenServer.cast(pid, :check)
+ end)
+ end
+end