diff --git a/lib/quantum_storage_ets.ex b/lib/quantum_storage_ets.ex index 671c478..dfc036f 100644 --- a/lib/quantum_storage_ets.ex +++ b/lib/quantum_storage_ets.ex @@ -9,251 +9,109 @@ defmodule QuantumStoragePersistentEts do alias __MODULE__.State - @server __MODULE__ - @behaviour Quantum.Storage @doc false def start_link(opts), - do: GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, @server)) + do: GenServer.start_link(__MODULE__, opts, opts) @doc false @impl GenServer - def init(opts), do: {:ok, %State{schedulers: %{}, name: Keyword.get(opts, :name, @server)}} + def init(opts) do + table_name = + opts + |> Keyword.fetch!(:name) + |> Module.concat(Table) + + path = + Application.app_dir( + :quantum_storage_persistent_ets, + "priv/tables/#{table_name}.tab" + ) + + File.mkdir_p!(Path.dirname(path)) + + table = + PersistentEts.new(table_name, path, [ + :named_table, + :ordered_set, + :protected + ]) - @doc false - @impl Quantum.Storage - def jobs(server \\ @server, scheduler_module), - do: GenServer.call(server, {:jobs, scheduler_module}) + {:ok, %State{table: table}} + end @doc false @impl Quantum.Storage - def add_job(server \\ @server, scheduler_module, job), - do: GenServer.call(server, {:add_job, scheduler_module, job}) + def jobs(storage_pid), do: GenServer.call(storage_pid, :jobs) @doc false @impl Quantum.Storage - def delete_job(server \\ @server, scheduler_module, job_name), - do: GenServer.call(server, {:delete_job, scheduler_module, job_name}) + def add_job(storage_pid, job), do: GenServer.call(storage_pid, {:add_job, job}) @doc false @impl Quantum.Storage - def update_job_state(server \\ @server, scheduler_module, job_name, state), - do: GenServer.call(server, {:update_job_state, scheduler_module, job_name, state}) + def delete_job(storage_pid, job_name), do: GenServer.call(storage_pid, {:delete_job, job_name}) @doc false @impl Quantum.Storage - def last_execution_date(server \\ @server, scheduler_module), - do: GenServer.call(server, {:last_execution_date, scheduler_module}) + def update_job_state(storage_pid, job_name, state), + do: GenServer.call(storage_pid, {:update_job_state, job_name, state}) @doc false @impl Quantum.Storage - def update_last_execution_date(server \\ @server, scheduler_module, last_execution_date), - do: - GenServer.call(server, {:update_last_execution_date, scheduler_module, last_execution_date}) + def last_execution_date(storage_pid), do: GenServer.call(storage_pid, :last_execution_date) @doc false @impl Quantum.Storage - def purge(server \\ @server, scheduler_module), - do: GenServer.call(server, {:purge, scheduler_module}) + def update_last_execution_date(storage_pid, last_execution_date), + do: GenServer.call(storage_pid, {:update_last_execution_date, last_execution_date}) @doc false - def purge_all(server \\ @server), do: GenServer.call(server, :purge_all) + @impl Quantum.Storage + def purge(storage_pid), do: GenServer.call(storage_pid, :purge) @doc false @impl GenServer - def handle_call( - {:add_job, scheduler_module, job}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_add_job(name, scheduler_module, job), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } - end - - def handle_call( - {:jobs, scheduler_module}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_get_jobs(name, scheduler_module), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } + def handle_call({:add_job, job}, _from, %State{table: table} = state) do + {:reply, do_add_job(table, job), state} end - def handle_call( - {:delete_job, scheduler_module, job}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_delete_job(name, scheduler_module, job), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } + def handle_call(:jobs, _from, %State{table: table} = state) do + {:reply, do_get_jobs(table), state} end - def handle_call( - {:update_job_state, scheduler_module, job_name, job_state}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_update_job_state(name, scheduler_module, job_name, job_state), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } + def handle_call({:delete_job, job}, _from, %State{table: table} = state) do + {:reply, do_delete_job(table, job), state} end - def handle_call( - {:last_execution_date, scheduler_module}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_get_last_execution_date(name, scheduler_module), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } + def handle_call({:update_job_state, job_name, job_state}, _from, %State{table: table} = state) do + {:reply, do_update_job_state(table, job_name, job_state), state} end - def handle_call( - {:update_last_execution_date, scheduler_module, last_execution_date}, - _from, - %State{schedulers: schedulers, name: name} = state - ) do - { - :reply, - do_update_last_execution_date(name, scheduler_module, last_execution_date), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } + def handle_call(:last_execution_date, _from, %State{table: table} = state) do + {:reply, do_get_last_execution_date(table), state} end def handle_call( - {:purge, scheduler_module}, + {:update_last_execution_date, last_execution_date}, _from, - %State{schedulers: schedulers, name: name} = state + %State{table: table} = state ) do - { - :reply, - do_purge(name, scheduler_module), - %{ - state - | schedulers: - schedulers - |> Map.put_new_lazy(scheduler_module, fn -> - create_scheduler_module_atom(name, scheduler_module) - end) - } - } - end - - def handle_call(:purge_all, _from, %State{schedulers: schedulers, name: name} = state) do - schedulers |> Map.values() |> Enum.each(fn scheduler -> :ok = do_purge(name, scheduler) end) - {:reply, :ok, state} + {:reply, do_update_last_execution_date(table, last_execution_date), state} end - defp create_scheduler_module_atom(storage_name, scheduler_module) do - Module.concat(storage_name, scheduler_module) + def handle_call(:purge, _from, %State{table: table} = state) do + {:reply, do_purge(table), state} end defp job_key(job_name) do {:job, job_name} end - defp get_ets_by_scheduler(storage_name, scheduler_module) do - scheduler_module_atom = create_scheduler_module_atom(storage_name, scheduler_module) - - if ets_exist?(scheduler_module_atom) do - scheduler_module_atom - else - path = - Application.app_dir( - :quantum_storage_persistent_ets, - "priv/tables/#{scheduler_module_atom}.tab" - ) - - File.mkdir_p!(Path.dirname(path)) - - PersistentEts.new(scheduler_module_atom, path, [ - :named_table, - :ordered_set - ]) - end - end - - defp ets_exist?(ets_name) do - Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Determining whether ETS table with name [#{ - inspect(ets_name) - }] exists" - end) - - result = - case :ets.info(ets_name) do - :undefined -> false - _ -> true - end - - Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] ETS table with name [#{inspect(ets_name)}] #{ - if result, do: ~S|exists|, else: ~S|does not exist| - }" - end) - - result - end - - defp do_add_job(storage_name, scheduler_module, job) do - table = get_ets_by_scheduler(storage_name, scheduler_module) + defp do_add_job(table, job) do :ets.insert(table, entry = {job_key(job.name), job}) + :ets.insert(table, {:init_jobs}) Logger.debug(fn -> "[#{inspect(Node.self())}][#{__MODULE__}] inserting [#{inspect(entry)}] into Persistent ETS table [#{ @@ -264,31 +122,27 @@ defmodule QuantumStoragePersistentEts do :ok end - defp do_get_jobs(storage_name, scheduler_module) do - storage_name - |> create_scheduler_module_atom(scheduler_module) - |> ets_exist?() - |> if do - storage_name - |> get_ets_by_scheduler(scheduler_module) - |> :ets.match({{:job, :_}, :"$1"}) - |> List.flatten() - else - :not_applicable + defp do_get_jobs(table) do + table + |> :ets.lookup(:init_jobs) + |> case do + [{:init_jobs}] -> + table + |> :ets.match({{:job, :_}, :"$1"}) + |> List.flatten() + + [] -> + :not_applicable end end - defp do_delete_job(storage_name, scheduler_module, job_name) do - storage_name - |> get_ets_by_scheduler(scheduler_module) - |> :ets.delete(job_key(job_name)) + defp do_delete_job(table, job_name) do + :ets.delete(table, job_key(job_name)) :ok end - defp do_update_job_state(storage_name, scheduler_module, job_name, state) do - table = get_ets_by_scheduler(storage_name, scheduler_module) - + defp do_update_job_state(table, job_name, state) do table |> :ets.lookup(job_key(job_name)) |> Enum.map(&{elem(&1, 0), %{elem(&1, 1) | state: state}}) @@ -297,9 +151,8 @@ defmodule QuantumStoragePersistentEts do :ok end - defp do_get_last_execution_date(storage_name, scheduler_module) do - storage_name - |> get_ets_by_scheduler(scheduler_module) + defp do_get_last_execution_date(table) do + table |> :ets.lookup(:last_execution_date) |> case do [] -> :unknown @@ -307,14 +160,12 @@ defmodule QuantumStoragePersistentEts do end end - defp do_update_last_execution_date(storage_name, scheduler_module, last_execution_date) do - table = get_ets_by_scheduler(storage_name, scheduler_module) + defp do_update_last_execution_date(table, last_execution_date) do :ets.insert(table, {:last_execution_date, last_execution_date}) :ok end - defp do_purge(storage_name, scheduler_module) do - table = get_ets_by_scheduler(storage_name, scheduler_module) + defp do_purge(table) do :ets.delete_all_objects(table) :ok end diff --git a/lib/quantum_storage_ets/application.ex b/lib/quantum_storage_ets/application.ex deleted file mode 100644 index ea24435..0000000 --- a/lib/quantum_storage_ets/application.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule QuantumStoragePersistentEts.Application do - @moduledoc false - - use Application - - def start(_type, _args), - do: - Supervisor.start_link([QuantumStoragePersistentEts], - strategy: :one_for_one, - name: QuantumStoragePersistentEts.Supervisor - ) -end diff --git a/lib/quantum_storage_ets/state.ex b/lib/quantum_storage_ets/state.ex index 0aac57f..c50b83b 100644 --- a/lib/quantum_storage_ets/state.ex +++ b/lib/quantum_storage_ets/state.ex @@ -1,8 +1,8 @@ defmodule QuantumStoragePersistentEts.State do @moduledoc false - @type t :: %__MODULE__{schedulers: map} + @type t :: %__MODULE__{table: :ets.tid()} - @enforce_keys [:schedulers, :name] - defstruct schedulers: %{}, name: QuantumStoragePersistentEts + @enforce_keys [:table] + defstruct @enforce_keys end diff --git a/mix.exs b/mix.exs index ce2018a..1f40fab 100644 --- a/mix.exs +++ b/mix.exs @@ -36,8 +36,7 @@ defmodule QuantumStoragePersistentEts.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger], - mod: {QuantumStoragePersistentEts.Application, []} + extra_applications: [:logger] ] end @@ -74,7 +73,7 @@ defmodule QuantumStoragePersistentEts.MixProject do defp deps do [ {:persistent_ets, "~> 0.1"}, - {:quantum, "~> 3.0-rc"}, + {:quantum, "~> 3.0.0-rc.2"}, {:ex_doc, "~> 0.13", only: [:dev, :docs], runtime: false}, {:excoveralls, "~> 0.5", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.0-rc", only: [:dev, :test], runtime: false}, diff --git a/test/quantum_storage_ets_test.exs b/test/quantum_storage_ets_test.exs index 9a8bbf4..f3060f5 100644 --- a/test/quantum_storage_ets_test.exs +++ b/test/quantum_storage_ets_test.exs @@ -5,6 +5,8 @@ defmodule QuantumStoragePersistentEtsTest do doctest QuantumStoragePersistentEts defmodule Scheduler do + @moduledoc false + use Quantum, otp_app: :quantum_storage_persistent_ets end @@ -12,98 +14,76 @@ defmodule QuantumStoragePersistentEtsTest do storage = start_supervised!({QuantumStoragePersistentEts, name: Module.concat(__MODULE__, test)}) - assert :ok = QuantumStoragePersistentEts.purge(storage, A) - assert :ok = QuantumStoragePersistentEts.purge(storage, B) + assert :ok = QuantumStoragePersistentEts.purge(storage) {:ok, storage: storage} end describe "purge/1" do test "purges correct module", %{storage: storage} do - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, Scheduler.new_job()) - assert :ok = QuantumStoragePersistentEts.purge(storage, A) - assert [] = QuantumStoragePersistentEts.jobs(storage, A) - end - - test "does not purge incorrect module", %{storage: storage} do - job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) - assert :ok = QuantumStoragePersistentEts.purge(storage, B) - assert [^job] = QuantumStoragePersistentEts.jobs(storage, A) + assert :ok = QuantumStoragePersistentEts.add_job(storage, Scheduler.new_job()) + assert :ok = QuantumStoragePersistentEts.purge(storage) + assert :not_applicable = QuantumStoragePersistentEts.jobs(storage) end end describe "add_job/2" do test "adds job", %{storage: storage} do job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) - assert [^job] = QuantumStoragePersistentEts.jobs(storage, A) + assert :ok = QuantumStoragePersistentEts.add_job(storage, job) + assert [^job] = QuantumStoragePersistentEts.jobs(storage) end end describe "delete_job/2" do test "deletes job", %{storage: storage} do job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) - assert :ok = QuantumStoragePersistentEts.delete_job(storage, A, job.name) - assert [] = QuantumStoragePersistentEts.jobs(storage, A) + assert :ok = QuantumStoragePersistentEts.add_job(storage, job) + assert :ok = QuantumStoragePersistentEts.delete_job(storage, job.name) + assert [] = QuantumStoragePersistentEts.jobs(storage) end test "does not fail when deleting unknown job", %{storage: storage} do job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) - - assert :ok = QuantumStoragePersistentEts.delete_job(storage, A, make_ref()) - end + assert :ok = QuantumStoragePersistentEts.add_job(storage, job) - test "does not fail when deleting job from unknown scheduler", %{storage: storage} do - assert :ok = QuantumStoragePersistentEts.delete_job(storage, A, make_ref()) + assert :ok = QuantumStoragePersistentEts.delete_job(storage, make_ref()) end end describe "update_job_state/2" do test "updates job", %{storage: storage} do job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) - assert :ok = QuantumStoragePersistentEts.update_job_state(storage, A, job.name, :inactive) - assert [%{state: :inactive}] = QuantumStoragePersistentEts.jobs(storage, A) + assert :ok = QuantumStoragePersistentEts.add_job(storage, job) + assert :ok = QuantumStoragePersistentEts.update_job_state(storage, job.name, :inactive) + assert [%{state: :inactive}] = QuantumStoragePersistentEts.jobs(storage) end test "does not fail when updating unknown job", %{storage: storage} do job = Scheduler.new_job() - assert :ok = QuantumStoragePersistentEts.add_job(storage, A, job) + assert :ok = QuantumStoragePersistentEts.add_job(storage, job) - assert :ok = QuantumStoragePersistentEts.update_job_state(storage, A, make_ref(), :inactive) - end - - test "does not fail when updating job from unknown scheduler", %{storage: storage} do - assert :ok = QuantumStoragePersistentEts.delete_job(storage, A, make_ref()) + assert :ok = QuantumStoragePersistentEts.update_job_state(storage, make_ref(), :inactive) end end describe "update_last_execution_date/2" do test "sets time on scheduler", %{storage: storage} do date = NaiveDateTime.utc_now() - assert :ok = QuantumStoragePersistentEts.update_last_execution_date(storage, A, date) - assert ^date = QuantumStoragePersistentEts.last_execution_date(storage, A) - end - - test "sets time only on right scheduler", %{storage: storage} do - date = NaiveDateTime.utc_now() - assert :ok = QuantumStoragePersistentEts.update_last_execution_date(storage, A, date) - assert :unknown = QuantumStoragePersistentEts.last_execution_date(storage, B) + assert :ok = QuantumStoragePersistentEts.update_last_execution_date(storage, date) + assert ^date = QuantumStoragePersistentEts.last_execution_date(storage) end end describe "last_execution_date/1" do test "gets time", %{storage: storage} do date = NaiveDateTime.utc_now() - assert :ok = QuantumStoragePersistentEts.update_last_execution_date(storage, A, date) - assert ^date = QuantumStoragePersistentEts.last_execution_date(storage, A) + assert :ok = QuantumStoragePersistentEts.update_last_execution_date(storage, date) + assert ^date = QuantumStoragePersistentEts.last_execution_date(storage) end test "get unknown otherwise", %{storage: storage} do - assert :unknown = QuantumStoragePersistentEts.last_execution_date(storage, A) + assert :unknown = QuantumStoragePersistentEts.last_execution_date(storage) end end end