Skip to content

Commit

Permalink
Process based Storage Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen committed Feb 28, 2020
1 parent 1764418 commit 092dd6d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 278 deletions.
285 changes: 68 additions & 217 deletions lib/quantum_storage_ets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,251 +9,109 @@ defmodule QuantumStoragePersistentEts do

alias __MODULE__.State

@server __MODULE__

@behaviour Quantum.Storage

@doc false
def start_link(opts),
do: GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, @server))
do: GenServer.start_link(__MODULE__, opts, opts)

@doc false
@impl GenServer
def init(opts), do: {:ok, %State{schedulers: %{}, name: Keyword.get(opts, :name, @server)}}
def init(opts) do
table_name =
opts
|> Keyword.fetch!(:name)
|> Module.concat(Table)

path =
Application.app_dir(
:quantum_storage_persistent_ets,
"priv/tables/#{table_name}.tab"
)

File.mkdir_p!(Path.dirname(path))

table =
PersistentEts.new(table_name, path, [
:named_table,
:ordered_set,
:protected
])

@doc false
@impl Quantum.Storage
def jobs(server \\ @server, scheduler_module),
do: GenServer.call(server, {:jobs, scheduler_module})
{:ok, %State{table: table}}
end

@doc false
@impl Quantum.Storage
def add_job(server \\ @server, scheduler_module, job),
do: GenServer.call(server, {:add_job, scheduler_module, job})
def jobs(storage_pid), do: GenServer.call(storage_pid, :jobs)

@doc false
@impl Quantum.Storage
def delete_job(server \\ @server, scheduler_module, job_name),
do: GenServer.call(server, {:delete_job, scheduler_module, job_name})
def add_job(storage_pid, job), do: GenServer.call(storage_pid, {:add_job, job})

@doc false
@impl Quantum.Storage
def update_job_state(server \\ @server, scheduler_module, job_name, state),
do: GenServer.call(server, {:update_job_state, scheduler_module, job_name, state})
def delete_job(storage_pid, job_name), do: GenServer.call(storage_pid, {:delete_job, job_name})

@doc false
@impl Quantum.Storage
def last_execution_date(server \\ @server, scheduler_module),
do: GenServer.call(server, {:last_execution_date, scheduler_module})
def update_job_state(storage_pid, job_name, state),
do: GenServer.call(storage_pid, {:update_job_state, job_name, state})

@doc false
@impl Quantum.Storage
def update_last_execution_date(server \\ @server, scheduler_module, last_execution_date),
do:
GenServer.call(server, {:update_last_execution_date, scheduler_module, last_execution_date})
def last_execution_date(storage_pid), do: GenServer.call(storage_pid, :last_execution_date)

@doc false
@impl Quantum.Storage
def purge(server \\ @server, scheduler_module),
do: GenServer.call(server, {:purge, scheduler_module})
def update_last_execution_date(storage_pid, last_execution_date),
do: GenServer.call(storage_pid, {:update_last_execution_date, last_execution_date})

@doc false
def purge_all(server \\ @server), do: GenServer.call(server, :purge_all)
@impl Quantum.Storage
def purge(storage_pid), do: GenServer.call(storage_pid, :purge)

@doc false
@impl GenServer
def handle_call(
{:add_job, scheduler_module, job},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_add_job(name, scheduler_module, job),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
end

def handle_call(
{:jobs, scheduler_module},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_get_jobs(name, scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
def handle_call({:add_job, job}, _from, %State{table: table} = state) do
{:reply, do_add_job(table, job), state}
end

def handle_call(
{:delete_job, scheduler_module, job},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_delete_job(name, scheduler_module, job),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
def handle_call(:jobs, _from, %State{table: table} = state) do
{:reply, do_get_jobs(table), state}
end

def handle_call(
{:update_job_state, scheduler_module, job_name, job_state},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_update_job_state(name, scheduler_module, job_name, job_state),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
def handle_call({:delete_job, job}, _from, %State{table: table} = state) do
{:reply, do_delete_job(table, job), state}
end

def handle_call(
{:last_execution_date, scheduler_module},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_get_last_execution_date(name, scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
def handle_call({:update_job_state, job_name, job_state}, _from, %State{table: table} = state) do
{:reply, do_update_job_state(table, job_name, job_state), state}
end

def handle_call(
{:update_last_execution_date, scheduler_module, last_execution_date},
_from,
%State{schedulers: schedulers, name: name} = state
) do
{
:reply,
do_update_last_execution_date(name, scheduler_module, last_execution_date),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
def handle_call(:last_execution_date, _from, %State{table: table} = state) do
{:reply, do_get_last_execution_date(table), state}
end

def handle_call(
{:purge, scheduler_module},
{:update_last_execution_date, last_execution_date},
_from,
%State{schedulers: schedulers, name: name} = state
%State{table: table} = state
) do
{
:reply,
do_purge(name, scheduler_module),
%{
state
| schedulers:
schedulers
|> Map.put_new_lazy(scheduler_module, fn ->
create_scheduler_module_atom(name, scheduler_module)
end)
}
}
end

def handle_call(:purge_all, _from, %State{schedulers: schedulers, name: name} = state) do
schedulers |> Map.values() |> Enum.each(fn scheduler -> :ok = do_purge(name, scheduler) end)
{:reply, :ok, state}
{:reply, do_update_last_execution_date(table, last_execution_date), state}
end

defp create_scheduler_module_atom(storage_name, scheduler_module) do
Module.concat(storage_name, scheduler_module)
def handle_call(:purge, _from, %State{table: table} = state) do
{:reply, do_purge(table), state}
end

defp job_key(job_name) do
{:job, job_name}
end

defp get_ets_by_scheduler(storage_name, scheduler_module) do
scheduler_module_atom = create_scheduler_module_atom(storage_name, scheduler_module)

if ets_exist?(scheduler_module_atom) do
scheduler_module_atom
else
path =
Application.app_dir(
:quantum_storage_persistent_ets,
"priv/tables/#{scheduler_module_atom}.tab"
)

File.mkdir_p!(Path.dirname(path))

PersistentEts.new(scheduler_module_atom, path, [
:named_table,
:ordered_set
])
end
end

defp ets_exist?(ets_name) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Determining whether ETS table with name [#{
inspect(ets_name)
}] exists"
end)

result =
case :ets.info(ets_name) do
:undefined -> false
_ -> true
end

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] ETS table with name [#{inspect(ets_name)}] #{
if result, do: ~S|exists|, else: ~S|does not exist|
}"
end)

result
end

defp do_add_job(storage_name, scheduler_module, job) do
table = get_ets_by_scheduler(storage_name, scheduler_module)
defp do_add_job(table, job) do
:ets.insert(table, entry = {job_key(job.name), job})
:ets.insert(table, {:init_jobs})

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] inserting [#{inspect(entry)}] into Persistent ETS table [#{
Expand All @@ -264,31 +122,27 @@ defmodule QuantumStoragePersistentEts do
:ok
end

defp do_get_jobs(storage_name, scheduler_module) do
storage_name
|> create_scheduler_module_atom(scheduler_module)
|> ets_exist?()
|> if do
storage_name
|> get_ets_by_scheduler(scheduler_module)
|> :ets.match({{:job, :_}, :"$1"})
|> List.flatten()
else
:not_applicable
defp do_get_jobs(table) do
table
|> :ets.lookup(:init_jobs)
|> case do
[{:init_jobs}] ->
table
|> :ets.match({{:job, :_}, :"$1"})
|> List.flatten()

[] ->
:not_applicable
end
end

defp do_delete_job(storage_name, scheduler_module, job_name) do
storage_name
|> get_ets_by_scheduler(scheduler_module)
|> :ets.delete(job_key(job_name))
defp do_delete_job(table, job_name) do
:ets.delete(table, job_key(job_name))

:ok
end

defp do_update_job_state(storage_name, scheduler_module, job_name, state) do
table = get_ets_by_scheduler(storage_name, scheduler_module)

defp do_update_job_state(table, job_name, state) do
table
|> :ets.lookup(job_key(job_name))
|> Enum.map(&{elem(&1, 0), %{elem(&1, 1) | state: state}})
Expand All @@ -297,24 +151,21 @@ defmodule QuantumStoragePersistentEts do
:ok
end

defp do_get_last_execution_date(storage_name, scheduler_module) do
storage_name
|> get_ets_by_scheduler(scheduler_module)
defp do_get_last_execution_date(table) do
table
|> :ets.lookup(:last_execution_date)
|> case do
[] -> :unknown
[{:last_execution_date, date} | _t] -> date
end
end

defp do_update_last_execution_date(storage_name, scheduler_module, last_execution_date) do
table = get_ets_by_scheduler(storage_name, scheduler_module)
defp do_update_last_execution_date(table, last_execution_date) do
:ets.insert(table, {:last_execution_date, last_execution_date})
:ok
end

defp do_purge(storage_name, scheduler_module) do
table = get_ets_by_scheduler(storage_name, scheduler_module)
defp do_purge(table) do
:ets.delete_all_objects(table)
:ok
end
Expand Down
12 changes: 0 additions & 12 deletions lib/quantum_storage_ets/application.ex

This file was deleted.

Loading

0 comments on commit 092dd6d

Please sign in to comment.