Skip to content

Commit

Permalink
Merge pull request #11 from cheerfulstoic/add-subscribe-to-associations
Browse files Browse the repository at this point in the history
Allow subscribing to association columns
  • Loading branch information
cheerfulstoic authored Jul 29, 2024
2 parents 7dd70c9 + f426959 commit 25b264f
Show file tree
Hide file tree
Showing 4 changed files with 394 additions and 75 deletions.
9 changes: 3 additions & 6 deletions lib/ecto_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ defmodule EctoWatch do

def check_update_args(update_type, id) do
case {update_type, id} do
{:inserted, nil} ->
:ok

{:inserted, _} ->
{:error, "Cannot subscribe to id for inserted records"}
:ok

{:updated, _} ->
:ok
Expand All @@ -33,8 +30,8 @@ defmodule EctoWatch do
:ok

{other, _} ->
{:error,
"Unexpected update_type: #{inspect(other)}. Expected :inserted, :updated, or :deleted"}
raise ArgumentError,
"Unexpected update_type: #{inspect(other)}. Expected :inserted, :updated, or :deleted"
end
end

Expand Down
143 changes: 97 additions & 46 deletions lib/ecto_watch/watcher_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ defmodule EctoWatch.WatcherServer do

use GenServer

def pub_sub_subscription_details(schema_mod_or_label, update_type, identifier) do
def pub_sub_subscription_details(schema_mod_or_label, update_type, identifier_value) do
name = unique_label(schema_mod_or_label, update_type)

if Process.whereis(name) do
{:ok,
GenServer.call(
name,
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier}
)}
GenServer.call(
name,
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}
)
else
{:error, "No watcher found for #{inspect(schema_mod_or_label)} / #{inspect(update_type)}"}
end
Expand All @@ -25,20 +24,50 @@ defmodule EctoWatch.WatcherServer do
end

def handle_call(
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier},
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value},
_from,
state
) do
unique_label = unique_label(schema_mod_or_label, update_type)

channel_name =
if identifier do
"#{unique_label}:#{identifier}"
else
"#{unique_label}"
[primary_key] = state.schema_mod.__schema__(:primary_key)

{column, value} =
case identifier_value do
{key, value} ->
{key, value}

nil ->
{nil, nil}

identifier_value ->
{primary_key, identifier_value}
end

result =
cond do
update_type == :inserted && column == primary_key ->
{:error, "Cannot subscribe to primary_key for inserted records"}

column && column != primary_key &&
not MapSet.member?(state.association_owner_keys, column) ->
{:error, "Column #{column} is not an association column/"}

column && column != primary_key && column not in state.extra_columns ->
{:error, "Column #{column} is not in the list of extra columns"}

true ->
channel_name =
if column && value do
"#{unique_label}|#{column}|#{value}"
else
"#{unique_label}"
end

{:ok, {state.pub_sub_mod, channel_name}}
end

{:reply, {state.pub_sub_mod, channel_name}, state}
{:reply, result, state}
end

def init({repo_mod, pub_sub_mod, watcher_options}) do
Expand Down Expand Up @@ -124,10 +153,19 @@ defmodule EctoWatch.WatcherServer do
pub_sub_mod: pub_sub_mod,
unique_label: unique_label,
schema_mod: watcher_options.schema_mod,
association_owner_keys: association_owner_keys(watcher_options.schema_mod),
extra_columns: watcher_options.opts[:extra_columns] || [],
schema_mod_or_label: watcher_options.opts[:label] || watcher_options.schema_mod
}}
end

defp association_owner_keys(schema_mod) do
schema_mod.__schema__(:associations)
|> Enum.map(&schema_mod.__schema__(:association, &1))
|> Enum.map(& &1.owner_key)
|> MapSet.new()
end

def handle_info({:notification, _pid, _ref, channel_name, payload}, state) do
if channel_name != state.unique_label do
raise "Expected to receive message from #{state.unique_label}, but received from #{channel_name}"
Expand All @@ -137,44 +175,57 @@ defmodule EctoWatch.WatcherServer do

extra = Map.new(extra, fn {k, v} -> {String.to_existing_atom(k), v} end)

case type do
"inserted" ->
Phoenix.PubSub.broadcast(
state.pub_sub_mod,
state.unique_label,
{:inserted, state.schema_mod_or_label, identifier, extra}
)

"updated" ->
Phoenix.PubSub.broadcast(
state.pub_sub_mod,
"#{state.unique_label}:#{identifier}",
{:updated, state.schema_mod_or_label, identifier, extra}
)

Phoenix.PubSub.broadcast(
state.pub_sub_mod,
state.unique_label,
{:updated, state.schema_mod_or_label, identifier, extra}
)

"deleted" ->
Phoenix.PubSub.broadcast(
state.pub_sub_mod,
"#{state.unique_label}:#{identifier}",
{:deleted, state.schema_mod_or_label, identifier, extra}
)

Phoenix.PubSub.broadcast(
state.pub_sub_mod,
state.unique_label,
{:deleted, state.schema_mod_or_label, identifier, extra}
)
type = String.to_existing_atom(type)

message = {type, state.schema_mod_or_label, identifier, extra}

[primary_key] = state.schema_mod.__schema__(:primary_key)

for topic <-
topics(
type,
state.unique_label,
primary_key,
identifier,
extra,
state.association_owner_keys
) do
Phoenix.PubSub.broadcast(state.pub_sub_mod, topic, message)
end

{:noreply, state}
end

# This code is a mess 😅
# will refactor when the primary key is moved into the map
# see: https://github.com/cheerfulstoic/ecto_watch/discussions/8
def topics(:inserted, unique_label, primary_key, identifier, extra, association_owner_keys) do

Check warning on line 202 in lib/ecto_watch/watcher_server.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "identifier" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 202 in lib/ecto_watch/watcher_server.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "primary_key" is unused (if the variable is not meant to be used, prefix it with an underscore)
subscription_columns =
Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)

Check warning on line 204 in lib/ecto_watch/watcher_server.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "v" is unused (if the variable is not meant to be used, prefix it with an underscore)

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end

def topics(:updated, unique_label, primary_key, identifier, extra, association_owner_keys) do
subscription_columns =
[
{primary_key, identifier}
| Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)

Check warning on line 213 in lib/ecto_watch/watcher_server.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "v" is unused (if the variable is not meant to be used, prefix it with an underscore)
]

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end

def topics(:deleted, unique_label, primary_key, identifier, extra, association_owner_keys) do
subscription_columns =
[
{primary_key, identifier}
| Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)

Check warning on line 223 in lib/ecto_watch/watcher_server.ex

View workflow job for this annotation

GitHub Actions / Build and test

variable "v" is unused (if the variable is not meant to be used, prefix it with an underscore)
]

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end

def name(%WatcherOptions{} = watcher_options) do
unique_label(watcher_options)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EctoWatch.MixProject do
def project do
[
app: :ecto_watch,
version: "0.5.4",
version: "0.6.0-rc1",
elixir: "~> 1.10",
description:
"EctoWatch allows you to easily get Phoenix.PubSub notifications directly from postgresql.",
Expand Down
Loading

0 comments on commit 25b264f

Please sign in to comment.