Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: BREAKING: Change sent message to be a three-element tuple... #12

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ You can also subscribe to individual records:
Once subscribed, messages can be handled like so (LiveView example given here but `handle_info` callbacks can be used elsewhere as well):

```elixir
def handle_info({:inserted, MyApp.Accounts.User, id, _}, socket) do
def handle_info({:inserted, MyApp.Accounts.User, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)

{:noreply, socket}
end

def handle_info({:updated, MyApp.Accounts.User, id, _}, socket) do
def handle_info({:updated, MyApp.Accounts.User, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_insert(socket, :users, user)

{:noreply, socket}
end

def handle_info({:deleted, MyApp.Accounts.User, id, _}, socket) do
def handle_info({:deleted, MyApp.Accounts.User, %{id: id}}, socket) do
user = Accounts.get_user(id)
socket = stream_delete(socket, :users, user)

Expand Down Expand Up @@ -101,7 +101,7 @@ You can also setup the database to trigger only on specific column changes on `:
EctoWatch.subscribe(:user_contact_info, :updated, package.id)

# handling messages
def handle_info({:updated, :user_contact_info, id, _}, socket) do
def handle_info({:updated, :user_contact_info, %{id: id}}, socket) do
```

A label is required for two reasons:
Expand All @@ -128,7 +128,7 @@ You can also use labels in general without tracking specific columns:
EctoWatch.subscribe(:user_update, :updated, package.id)

# handling messages
def handle_info({:updated, :user_update, id, _}, socket) do
def handle_info({:updated, :user_update, %{id: id}}, socket) do
```

## Getting additional values
Expand Down Expand Up @@ -160,7 +160,7 @@ If you would like to get more than just the `id` from the record, you can use th
EctoWatch.subscribe(MyApp.Posts.Comment, :deleted)

# handling messages
def handle_info({:updated, MyApp.Posts.Comment, id, %{post_id: post_id}}, socket) do
def handle_info({:updated, MyApp.Posts.Comment, %{id: id, post_id: post_id}}, socket) do
```

## Example use-cases
Expand Down Expand Up @@ -208,6 +208,7 @@ Disabling the triggers can lock the table in a transaction and so should be used
* allow specifying a condition for when the trigger should fire
* Creating a batch-processing GenServer to reduce queries to the database.
* Make watchers more generic (?). Don't need dependency on PubSub, but could make it an adapter or something
* Allow for local broadcasting of Phoenix.PubSub messages

## Installation

Expand Down
52 changes: 20 additions & 32 deletions lib/ecto_watch/watcher_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ defmodule EctoWatch.WatcherServer do
update_type == :inserted && column == primary_key ->
{:error, "Cannot subscribe to primary_key for inserted records"}

column && column != primary_key &&
not MapSet.member?(state.association_owner_keys, column) ->
column && not MapSet.member?(state.identifier_keys, column) ->
{:error, "Column #{column} is not an association column/"}

column && column != primary_key && column not in state.extra_columns ->
Expand Down Expand Up @@ -98,14 +97,15 @@ defmodule EctoWatch.WatcherServer do
"DELETE"
end

extra_columns_sql =
(watcher_options.opts[:extra_columns] || [])
|> Enum.map_join(",", &"'#{&1}',row.#{&1}")

# TODO: Raise an "unsupported" error if primary key is more than one column
# Or maybe multiple columns could be supported?
[primary_key] = watcher_options.schema_mod.__schema__(:primary_key)

extra_columns = watcher_options.opts[:extra_columns] || []
all_columns = [primary_key | extra_columns]

columns_sql = Enum.map_join(all_columns, ",", &"'#{&1}',row.#{&1}")

Ecto.Adapters.SQL.query!(
repo_mod,
"""
Expand All @@ -116,7 +116,7 @@ defmodule EctoWatch.WatcherServer do
payload TEXT;
BEGIN
row := COALESCE(NEW, OLD);
payload := jsonb_build_object('type','#{watcher_options.update_type}','identifier',row.#{primary_key},'extra',json_build_object(#{extra_columns_sql}));
payload := jsonb_build_object('type','#{watcher_options.update_type}','values',json_build_object(#{columns_sql}));
PERFORM pg_notify('#{unique_label}', payload);

RETURN NEW;
Expand Down Expand Up @@ -153,7 +153,8 @@ defmodule EctoWatch.WatcherServer do
pub_sub_mod: pub_sub_mod,
unique_label: unique_label,
schema_mod: watcher_options.schema_mod,
association_owner_keys: association_owner_keys(watcher_options.schema_mod),
identifier_keys:
MapSet.put(association_owner_keys(watcher_options.schema_mod), primary_key),
extra_columns: watcher_options.opts[:extra_columns] || [],
schema_mod_or_label: watcher_options.opts[:label] || watcher_options.schema_mod
}}
Expand All @@ -171,57 +172,44 @@ defmodule EctoWatch.WatcherServer do
raise "Expected to receive message from #{state.unique_label}, but received from #{channel_name}"
end

%{"type" => type, "identifier" => identifier, "extra" => extra} = Jason.decode!(payload)
%{"type" => type, "values" => values} = Jason.decode!(payload)

extra = Map.new(extra, fn {k, v} -> {String.to_existing_atom(k), v} end)
values = Map.new(values, fn {k, v} -> {String.to_existing_atom(k), v} end)

type = String.to_existing_atom(type)

message = {type, state.schema_mod_or_label, identifier, extra}

[primary_key] = state.schema_mod.__schema__(:primary_key)
message = {type, state.schema_mod_or_label, values}

for topic <-
topics(
type,
state.unique_label,
primary_key,
identifier,
extra,
state.association_owner_keys
values,
state.identifier_keys
) do
Phoenix.PubSub.broadcast(state.pub_sub_mod, topic, message)
end

{:noreply, state}
end

# This code is a mess 😅
# will refactor when the primary key is moved into the map
# see: https://github.com/cheerfulstoic/ecto_watch/discussions/8
def topics(:inserted, unique_label, primary_key, identifier, extra, association_owner_keys) do
def topics(:inserted, unique_label, values, identifier_keys) do
subscription_columns =
Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)
Enum.filter(values, fn {k, _} -> MapSet.member?(identifier_keys, k) end)

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end

def topics(:updated, unique_label, primary_key, identifier, extra, association_owner_keys) do
def topics(:updated, unique_label, values, identifier_keys) do
subscription_columns =
[
{primary_key, identifier}
| Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)
]
Enum.filter(values, fn {k, _} -> MapSet.member?(identifier_keys, k) end)

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end

def topics(:deleted, unique_label, primary_key, identifier, extra, association_owner_keys) do
def topics(:deleted, unique_label, values, identifier_keys) do
subscription_columns =
[
{primary_key, identifier}
| Enum.filter(extra, fn {k, v} -> MapSet.member?(association_owner_keys, k) end)
]
Enum.filter(values, fn {k, _} -> MapSet.member?(identifier_keys, k) end)

[unique_label | Enum.map(subscription_columns, fn {k, v} -> "#{unique_label}|#{k}|#{v}" end)]
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EctoWatch.MixProject do
def project do
[
app: :ecto_watch,
version: "0.6.0-rc1",
version: "0.6.0-rc2",
elixir: "~> 1.10",
description:
"EctoWatch allows you to easily get Phoenix.PubSub notifications directly from postgresql.",
Expand Down
Loading