diff --git a/README.md b/README.md index b87e8a5..b8bb525 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,6 @@ Farside's routing is very minimal, with only the following routes: - `/` - The app home page, displaying all live instances for every service -- `/ping` - - A passthrough "ping" to redis to ensure both app and redis are working - `/:service/*glob` - The main endpoint for redirecting a user to a working instance of a particular service with the specified path @@ -108,12 +106,9 @@ that their mission to centralize the entire web behind their service ultimately goes against what Farside is trying to solve. Use at your own discretion. ## Development -- Install [redis](https://redis.io) - Install [elixir](https://elixir-lang.org/install.html) - (on Debian systems) Install [erlang-dev](https://https://packages.debian.org/sid/erlang-dev) -- Start redis: `redis-server` - Install dependencies: `mix deps.get` -- Initialize redis contents: `mix run -e Farside.Instances.sync` - Run Farside: `mix run --no-halt` - Uses localhost:4001 @@ -123,5 +118,4 @@ goes against what Farside is trying to solve. Use at your own discretion. | -- | -- | | FARSIDE_TEST | If enabled, bypasses the instance availability check and adds all instances to the pool. | | FARSIDE_PORT | The port to run Farside on (default: `4001`) | -| FARSIDE_REDIS_PORT | The Redis server port to use (default: `6379`, same as the default for Redis) | | FARSIDE_SERVICES_JSON | The "services" JSON file to use for selecting instances (default: `services.json`) | diff --git a/config/config.exs b/config/config.exs index 11f61d9..eaabb0d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -3,8 +3,6 @@ import Config config :farside, update_file: ".update-results", service_prefix: "service-", - fallback_suffix: "-fallback", - previous_suffix: "-previous", index: "index.eex", route: "route.eex", headers: [ diff --git a/lib/farside.ex b/lib/farside.ex index 6e6e7e6..072535d 100644 --- a/lib/farside.ex +++ b/lib/farside.ex @@ -1,7 +1,5 @@ defmodule Farside do @service_prefix Application.fetch_env!(:farside, :service_prefix) - @fallback_suffix Application.fetch_env!(:farside, :fallback_suffix) - @previous_suffix Application.fetch_env!(:farside, :previous_suffix) # Define relation between available services and their parent service. # This enables Farside to redirect with links such as: @@ -34,16 +32,12 @@ defmodule Farside do @quora_regex => ["querte"] } - def get_services_map do - {:ok, service_list} = Redix.command(:redix, ["KEYS", "#{@service_prefix}*"]) + alias Farside.LastUpdated - # Match service name to list of available instances - Enum.reduce(service_list, %{}, fn service, acc -> - {:ok, instance_list} = - Redix.command( - :redix, - ["LRANGE", service, "0", "-1"] - ) + def get_services_map do + Farside.Server.Supervisor.list() + |> Enum.reduce(%{}, fn service, acc -> + {_, data} = :ets.lookup(String.to_atom(service), :data) |> List.first() Map.put( acc, @@ -52,98 +46,32 @@ defmodule Farside do @service_prefix, "" ), - instance_list + data.instances ) end) end - def get_service(service) do - # Check if service has an entry in Redis, otherwise try to - # match against available parent services - service_name = cond do - !check_service(service) -> - Enum.find_value( - @parent_services, - fn {k, v} -> - String.match?(service, k) && Enum.random(v) - end) - true -> - service - end - - service_name - end - - def check_service(service) do - # Checks to see if a specific service has instances available - # in redis - {:ok, instances} = - Redix.command( - :redix, - [ - "LRANGE", - "#{@service_prefix}#{service}", - "0", - "-1" - ] - ) - - Enum.count(instances) > 0 - end - - def last_instance(service) do - # Fetches the last selected instance for a particular service - {:ok, previous} = - Redix.command( - :redix, - ["GET", "#{service}#{@previous_suffix}"] - ) - previous - end - - def pick_instance(service) do - {:ok, instances} = - Redix.command( - :redix, - [ - "LRANGE", - "#{@service_prefix}#{service}", - "0", - "-1" - ] - ) - - # Either pick a random available instance, - # or fall back to the default one - instance = - if Enum.count(instances) > 0 do - if Enum.count(instances) == 1 do - # If there's only one instance, just return that one... - List.first(instances) - else - # ...otherwise pick a random one from the list, ensuring - # that the same instance is never picked twice in a row. - instance = - Enum.filter(instances, &(&1 != last_instance(service))) - |> Enum.random() - - Redix.command( - :redix, - ["SET", "#{service}#{@previous_suffix}", instance] - ) - - instance + def get_service(service \\ "libreddit/r/popular") do + service_name = + Enum.find_value( + @parent_services, + fn {k, v} -> + String.match?(service, k) && Enum.random(v) end - else - {:ok, result} = - Redix.command( - :redix, - ["GET", "#{service}#{@fallback_suffix}"] - ) + ) - result - end - instance + data = :ets.lookup(String.to_atom(service_name), :data) + + {_, service} = List.first(data) + + case Enum.count(service.instances) > 0 do + true -> Enum.random(service.instances) + false -> service.fallback + end + end + + def get_last_updated do + LastUpdated.value() end def amend_instance(instance, service, path) do @@ -153,24 +81,15 @@ defmodule Farside do # so a "/u" is appended if the requested path doesn't explicitly include # "/p" for a post or an empty path for the home page. if String.length(path) > 0 and - !String.starts_with?(path, "p/") and - !String.starts_with?(path, "u/") do + !String.starts_with?(path, "p/") and + !String.starts_with?(path, "u/") do "#{instance}/u" else instance end + true -> instance end end - - def get_last_updated do - {:ok, last_updated} = - Redix.command( - :redix, - ["GET", "last_updated"] - ) - - last_updated - end end diff --git a/lib/farside/application.ex b/lib/farside/application.ex index 0ab77ea..068fdab 100644 --- a/lib/farside/application.ex +++ b/lib/farside/application.ex @@ -1,32 +1,105 @@ defmodule Farside.Application do - #@farside_port Application.fetch_env!(:farside, :port) - #@redis_conn Application.fetch_env!(:farside, :redis_conn) + # @farside_port Application.fetch_env!(:farside, :port) + # @redis_conn Application.fetch_env!(:farside, :redis_conn) @moduledoc false use Application + require Logger + + alias Farside.LastUpdated + alias Farside.Sync + @impl true def start(_type, _args) do - redis_conn = Application.fetch_env!(:farside, :redis_conn) - farside_port = Application.fetch_env!(:farside, :port) - IO.puts "Runing on http://localhost:#{farside_port}" - IO.puts "Redis conn: #{redis_conn}" + port = Application.fetch_env!(:farside, :port) - children = [ - Plug.Cowboy.child_spec( - scheme: :http, - plug: Farside.Router, - options: [ - port: String.to_integer(farside_port) - ] - ), - {PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000}, - {Redix, {redis_conn, [name: :redix]}}, - Farside.Scheduler, - Farside.Server - ] + Logger.info("Running on http://localhost:#{port}") + + maybe_loaded_children = + case is_nil(System.get_env("FARSIDE_TEST")) do + true -> + [{Sync, []}] + + false -> + Logger.info("Skipping sync job setup...") + [] + end + + children = + [ + Plug.Cowboy.child_spec( + scheme: :http, + plug: Farside.Router, + options: [ + port: String.to_integer(port) + ] + ), + {LastUpdated, DateTime.utc_now()}, + {PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000}, + {DynamicSupervisor, strategy: :one_for_one, name: :server_supervisor}, + {Registry, keys: :unique, name: :servers} + ] ++ maybe_loaded_children opts = [strategy: :one_for_one, name: Farside.Supervisor] + Supervisor.start_link(children, opts) + |> load() + end + + def load(response) do + services_json = Application.fetch_env!(:farside, :services_json) + queries = Application.fetch_env!(:farside, :queries) + + {:ok, file} = File.read(services_json) + {:ok, json} = Jason.decode(file) + + for service_json <- json do + service_atom = + for {key, val} <- service_json, into: %{} do + {String.to_existing_atom(key), val} + end + + service = struct(%Service{}, service_atom) + + request_urls = + Enum.map(service.instances, fn x -> + x <> + EEx.eval_string( + service.test_url, + query: Enum.random(queries) + ) + end) + + tasks = + for request_url <- request_urls do + Task.async(fn -> + reply = Farside.Http.request(request_url, service.type) + {request_url, reply} + end) + end + + tasks_with_results = Task.yield_many(tasks, 5000) + + instances = + Enum.map(tasks_with_results, fn {task, res} -> + # Shut down the tasks that did not reply nor exit + res || Task.shutdown(task, :brutal_kill) + end) + |> Enum.reject(fn x -> x == nil end) + |> Enum.map(fn {_, value} -> value end) + |> Enum.filter(fn {instance_url, value} -> + value == :good + end) + |> Enum.map(fn {url, _} -> url end) + + service = %{service | instances: instances} + + Farside.Instance.Supervisor.start(service) + end + + LastUpdated.value(DateTime.utc_now()) + + response end end diff --git a/lib/farside/http.ex b/lib/farside/http.ex new file mode 100644 index 0000000..cb35200 --- /dev/null +++ b/lib/farside/http.ex @@ -0,0 +1,47 @@ +defmodule Farside.Http do + require Logger + + @headers Application.fetch_env!(:farside, :headers) + + def request(url) do + cond do + System.get_env("FARSIDE_TEST") -> + :good + + true -> + HTTPoison.get(url, @headers) + |> then(&elem(&1, 1)) + |> Map.get(:status_code) + |> case do + n when n < 400 -> + Logger.info("Response: [#{n}]") + :good + + n -> + Logger.error("Response: [#{n}]") + :bad + end + end + end + + def request(url, type) do + cond do + System.get_env("FARSIDE_TEST") -> + :good + + true -> + HTTPoison.get(url, @headers) + |> then(&elem(&1, 1)) + |> Map.get(:status_code) + |> case do + n when n < 400 -> + Logger.info("Type: #{type}, Response: [#{n}], Url: #{url}") + :good + + n -> + Logger.error("Type: #{type}, Response: [#{n}], Url: #{url}") + :bad + end + end + end +end diff --git a/lib/farside/instance.ex b/lib/farside/instance.ex new file mode 100644 index 0000000..ed91341 --- /dev/null +++ b/lib/farside/instance.ex @@ -0,0 +1,124 @@ +defmodule Farside.Instance do + use GenServer + + require Logger + + @registry_name :servers + + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + type: :worker + } + end + + def init(init_arg) do + ref = + :ets.new(String.to_atom(init_arg.type), [ + :set, + :named_table, + :public, + read_concurrency: true, + write_concurrency: true + ]) + + :ets.insert(ref, {:data, init_arg}) + + {:ok, %{type: init_arg.type, ref: ref}} + end + + def start_link(arg) do + name = via_tuple(arg.type) + GenServer.start_link(__MODULE__, arg, name: name) + end + + def shutdown() do + GenServer.call(__MODULE__, :shutdown) + end + + def handle_call( + :shutdown, + _from, + state + ) do + {:stop, {:ok, "Normal Shutdown"}, state} + end + + def handle_cast( + :shutdown, + state + ) do + {:stop, :normal, state} + end + + def handle_cast( + :update, + state + ) do + service = :ets.lookup(String.to_atom(state.type), :data) + + {_, service} = List.first(service) + + queries = Application.fetch_env!(:farside, :queries) + + request_urls = + Enum.map(service.instances, fn x -> + x <> + EEx.eval_string( + service.test_url, + query: Enum.random(queries) + ) + end) + + tasks = + for request_url <- request_urls do + Task.async(fn -> + reply = Farside.Http.request(request_url, service.type) + {request_url, reply} + end) + end + + tasks_with_results = Task.yield_many(tasks, 5000) + + instances = + Enum.map(tasks_with_results, fn {task, res} -> + # Shut down the tasks that did not reply nor exit + res || Task.shutdown(task, :brutal_kill) + end) + |> Enum.reject(fn x -> x == nil end) + |> Enum.map(fn {_, value} -> value end) + |> Enum.filter(fn {instance_url, value} -> + value == :good + end) + |> Enum.map(fn {url, _} -> url end) + + values = %{service | instances: instances} + + :ets.delete_all_objects(String.to_atom(state.type)) + + :ets.insert(state.ref, {:data, values}) + + update_file = Application.fetch_env!(:farside, :update_file) + + File.write(update_file, values.fallback) + + {:noreply, state} + end + + @doc false + def via_tuple(data, registry \\ @registry_name) do + {:via, Registry, {registry, data}} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do + :ets.delete(names) + {:noreply, {names, refs}} + end + + @impl true + def handle_info(_msg, state) do + {:noreply, state} + end +end diff --git a/lib/farside/instance.supervisor.ex b/lib/farside/instance.supervisor.ex new file mode 100644 index 0000000..f890af1 --- /dev/null +++ b/lib/farside/instance.supervisor.ex @@ -0,0 +1,93 @@ +defmodule Farside.Instance.Supervisor do + use DynamicSupervisor + + alias __MODULE__, as: SUPERVISOR + alias Farside.Instance, as: SERVER + + @name :server_supervisor + @registry_name :servers + + def child_spec() do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, []}, + type: :supervisor + } + end + + def start_link(init_arg) do + DynamicSupervisor.start_link(__MODULE__, init_arg, name: @name) + end + + def start_link() do + DynamicSupervisor.start_link(__MODULE__, [], name: @name) + end + + @impl true + def init(_) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + def start(opts \\ %{}) do + child_spec = {SERVER, opts} + + DynamicSupervisor.start_child(@name, child_spec) + end + + def stop(id) do + case Registry.lookup(@registry_name, id) do + [] -> + :ok + + [{pid, _}] -> + Process.exit(pid, :shutdown) + :ok + end + end + + def update_children() do + list() + |> Enum.each(fn x -> + SERVER.via_tuple(x) + |> GenServer.cast(:update) + end) + end + + def whereis(id) do + case Registry.lookup(@registry_name, id) do + [{pid, _}] -> pid + [] -> nil + end + end + + def find_or_create(id) do + if process_exists?(id) do + {:ok, id} + else + id |> start + end + end + + def exists?(id) do + case Registry.lookup(@registry_name, id) do + [] -> false + _ -> true + end + end + + def list do + DynamicSupervisor.which_children(@name) + |> Enum.map(fn {_, account_proc_pid, _, _} -> + Registry.keys(@registry_name, account_proc_pid) + |> List.first() + end) + |> Enum.sort() + end + + def process_exists?(hash) do + case Registry.lookup(@registry_name, hash) do + [] -> false + _ -> true + end + end +end diff --git a/lib/farside/instances.ex b/lib/farside/instances.ex index f37f306..87e0f74 100644 --- a/lib/farside/instances.ex +++ b/lib/farside/instances.ex @@ -1,115 +1,17 @@ defmodule Farside.Instances do - @fallback_suffix Application.fetch_env!(:farside, :fallback_suffix) - @update_file Application.fetch_env!(:farside, :update_file) - @service_prefix Application.fetch_env!(:farside, :service_prefix) - @headers Application.fetch_env!(:farside, :headers) - @queries Application.fetch_env!(:farside, :queries) - @debug_header "======== " - @debug_spacer " " + alias Farside.LastUpdated def sync() do - File.rename(@update_file, "#{@update_file}-prev") - update() + update_file = Application.fetch_env!(:farside, :update_file) - # Add UTC time of last update - Redix.command(:redix, [ - "SET", - "last_updated", - Calendar.strftime(DateTime.utc_now(), "%c") - ]) - end + File.rm("#{update_file}-prev") - def request(url) do - cond do - System.get_env("FARSIDE_TEST") -> - :good + File.rename(update_file, "#{update_file}-prev") - true -> - HTTPoison.get(url, @headers) - |> then(&elem(&1, 1)) - |> Map.get(:status_code) - |> case do - n when n < 400 -> - IO.puts("#{@debug_spacer}✓ [#{n}]") - :good + File.write(update_file, "") - n -> - IO.puts("#{@debug_spacer}x [#{(n && n) || "error"}]") - :bad - end - end - end + LastUpdated.value(DateTime.utc_now()) - def update() do - services_json = Application.fetch_env!(:farside, :services_json) - {:ok, file} = File.read(services_json) - {:ok, json} = Jason.decode(file) - - # Loop through all instances and check each for availability - for service_json <- json do - service_atom = for {key, val} <- service_json, into: %{} do - {String.to_existing_atom(key), val} - end - - service = struct(%Service{}, service_atom) - - IO.puts("#{@debug_header}#{service.type}") - - result = - Enum.filter(service.instances, fn instance_url -> - request_url = - instance_url <> - EEx.eval_string( - service.test_url, - query: Enum.random(@queries) - ) - - IO.puts("#{@debug_spacer}#{request_url}") - - request(request_url) == :good - end) - - add_to_redis(service, result) - log_results(service.type, result) - end - end - - def add_to_redis(service, instances) do - # Remove previous list of instances - Redix.command(:redix, [ - "DEL", - "#{@service_prefix}#{service.type}" - ]) - - # Update with new list of available instances - Redix.command( - :redix, - [ - "LPUSH", - "#{@service_prefix}#{service.type}" - ] ++ instances - ) - - # Set fallback to one of the available instances, - # or the default instance if all are "down" - if Enum.count(instances) > 0 do - Redix.command(:redix, [ - "SET", - "#{service.type}#{@fallback_suffix}", - Enum.random(instances) - ]) - else - Redix.command(:redix, [ - "SET", - "#{service.type}#{@fallback_suffix}", - service.fallback - ]) - end - end - - def log_results(service_name, results) do - {:ok, file} = File.open(@update_file, [:append, {:delayed_write, 100, 20}]) - IO.write(file, "#{service_name}: #{inspect(results)}\n") - File.close(file) + Farside.Instance.Supervisor.update_children() end end diff --git a/lib/farside/last_updated.ex b/lib/farside/last_updated.ex new file mode 100644 index 0000000..6cf83a9 --- /dev/null +++ b/lib/farside/last_updated.ex @@ -0,0 +1,15 @@ +defmodule Farside.LastUpdated do + use Agent + + def start_link(initial_value) do + Agent.start_link(fn -> initial_value end, name: __MODULE__) + end + + def value do + Agent.get(__MODULE__, & &1) + end + + def value(new_value) do + Agent.update(__MODULE__, fn _ -> new_value end) + end +end diff --git a/lib/farside/router.ex b/lib/farside/router.ex index ed667e4..65b6e53 100644 --- a/lib/farside/router.ex +++ b/lib/farside/router.ex @@ -29,12 +29,6 @@ defmodule Farside.Router do send_resp(conn, 200, resp) end - get "/ping" do - # Useful for app healthcheck - {:ok, resp} = Redix.command(:redix, ["PING"]) - send_resp(conn, 200, resp) - end - get "/_/:service/*glob" do r_path = String.slice(conn.request_path, 2..-1) @@ -48,37 +42,48 @@ defmodule Farside.Router do end get "/:service/*glob" do - service_name = cond do - service =~ "http" -> - List.first(glob) - true -> - service - end + service_name = + cond do + service =~ "http" -> + List.first(glob) - path = cond do - service_name != service -> - Enum.join(Enum.slice(glob, 1..-1), "/") - true -> - Enum.join(glob, "/") - end + true -> + service + end - instance = cond do - conn.assigns[:throttle] != nil -> - Farside.get_service(service_name) - |> Farside.last_instance - |> Farside.amend_instance(service_name, path) - true -> - Farside.get_service(service_name) - |> Farside.pick_instance - |> Farside.amend_instance(service_name, path) - end + IO.inspect(service_name, label: "service_name") - # Redirect to the available instance - conn - |> Plug.Conn.resp(:found, "") - |> Plug.Conn.put_resp_header( - "location", - "#{instance}/#{path}#{get_query_params(conn)}" - ) + path = + cond do + service_name != service -> + Enum.join(Enum.slice(glob, 1..-1), "/") + + true -> + Enum.join(glob, "/") + end + + case service_name do + "favicon.ico" -> + conn |> Plug.Conn.resp(:not_found, "") + + _ -> + instance = + cond do + conn.assigns[:throttle] != nil -> + Farside.get_service(service_name) + + true -> + Farside.get_service(service_name) + end + |> Farside.amend_instance(service_name, path) + + # Redirect to the available instance + conn + |> Plug.Conn.resp(:found, "") + |> Plug.Conn.put_resp_header( + "location", + "#{instance}/#{path}#{get_query_params(conn)}" + ) + end end end diff --git a/lib/farside/scheduler.ex b/lib/farside/scheduler.ex deleted file mode 100644 index 4707624..0000000 --- a/lib/farside/scheduler.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Farside.Scheduler do - use Quantum, otp_app: :farside -end diff --git a/lib/farside/server.ex b/lib/farside/server.ex deleted file mode 100644 index 61aff40..0000000 --- a/lib/farside/server.ex +++ /dev/null @@ -1,22 +0,0 @@ -defmodule Farside.Server do - use GenServer - import Crontab.CronExpression - - def init(init_arg) do - {:ok, init_arg} - end - - def start_link(arg) do - if System.get_env("FARSIDE_TEST") do - IO.puts("Skipping sync job setup...") - else - Farside.Scheduler.new_job() - |> Quantum.Job.set_name(:sync) - |> Quantum.Job.set_schedule(~e[*/5 * * * *]) - |> Quantum.Job.set_task(fn -> Farside.Instances.sync() end) - |> Farside.Scheduler.add_job() - end - - GenServer.start_link(__MODULE__, arg) - end -end diff --git a/lib/farside/sync.ex b/lib/farside/sync.ex new file mode 100644 index 0000000..b1d26c6 --- /dev/null +++ b/lib/farside/sync.ex @@ -0,0 +1,28 @@ +defmodule Farside.Sync do + use Task + + def child_spec(args) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [args]}, + type: :worker + } + end + + def start_link(_arg) do + Task.start_link(&poll/0) + end + + def poll() do + receive do + after + 300_000 -> + sync() + poll() + end + end + + defp sync() do + Farside.Instances.sync() + end +end diff --git a/mix.exs b/mix.exs index 80cc121..fbd15b9 100644 --- a/mix.exs +++ b/mix.exs @@ -25,9 +25,7 @@ defmodule Farside.MixProject do {:httpoison, "~> 1.8"}, {:jason, "~> 1.1"}, {:plug_attack, "~> 0.4.2"}, - {:plug_cowboy, "~> 2.0"}, - {:quantum, "~> 3.0"}, - {:redix, "~> 1.1"} + {:plug_cowboy, "~> 2.0"} ] end end diff --git a/test/farside_test.exs b/test/farside_test.exs index d47396e..d2807bf 100644 --- a/test/farside_test.exs +++ b/test/farside_test.exs @@ -1,5 +1,4 @@ defmodule FarsideTest do - use ExUnit.Case use Plug.Test