From 7c2fa0c1fd848f8cb013a3cefaa7b7e892c24241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20M=C3=A4nnchen?= Date: Thu, 22 Nov 2018 08:54:25 +0100 Subject: [PATCH] Solution: Faster startup for non-global (#376) (#383) --- CHANGELOG.md | 3 + .../cluster_task_supervisor_registry.ex | 53 ++++++- .../init_opts.ex | 5 +- .../start_opts.ex | 5 +- .../cluster_task_supervisor_registry/state.ex | 5 +- lib/quantum/supervisor.ex | 2 +- .../cluster_task_supervisor_registry_test.exs | 136 ++++++++++++------ 7 files changed, 157 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 206e102..dcc4e3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +### Fixed +- Faster Startup duration for non-global + Diff for [unreleased] ## 2.3.3 - 2018-09-06 diff --git a/lib/quantum/cluster_task_supervisor_registry.ex b/lib/quantum/cluster_task_supervisor_registry.ex index f77fceb..db772e1 100644 --- a/lib/quantum/cluster_task_supervisor_registry.ex +++ b/lib/quantum/cluster_task_supervisor_registry.ex @@ -14,7 +14,7 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do struct!( InitOpts, opts - |> Map.take([:task_supervisor_reference, :group_name]) + |> Map.take([:task_supervisor_reference, :group_name, :global]) |> Map.put_new(:group_name, Module.concat(name, Group)) ), name: name @@ -23,7 +23,11 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do @doc false @impl true - def init(%InitOpts{task_supervisor_reference: task_supervisor, group_name: group_name}) do + def init(%InitOpts{ + task_supervisor_reference: task_supervisor, + group_name: group_name, + global: true + }) do task_supervisor_pid = GenServer.whereis(task_supervisor) monitor_ref = Process.monitor(task_supervisor_pid) @@ -41,16 +45,43 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do %State{ group_name: group_name, task_supervisor_pid: task_supervisor_pid, - monitor_ref: monitor_ref + monitor_ref: monitor_ref, + global: true + }} + end + + def init(%InitOpts{ + task_supervisor_reference: task_supervisor, + group_name: group_name, + global: false + }) do + task_supervisor_pid = GenServer.whereis(task_supervisor) + + monitor_ref = Process.monitor(task_supervisor_pid) + + {:ok, + %State{ + group_name: group_name, + task_supervisor_pid: task_supervisor_pid, + monitor_ref: monitor_ref, + global: false }} end @doc false @impl true - def handle_call(:pids, _from, %State{group_name: group_name} = state) do + def handle_call(:pids, _from, %State{group_name: group_name, global: true} = state) do {:reply, Swarm.members(group_name), state} end + def handle_call( + :pids, + _from, + %State{task_supervisor_pid: task_supervisor_pid, global: false} = state + ) do + {:reply, [task_supervisor_pid], state} + end + @doc false @impl true def handle_info( @@ -58,13 +89,25 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do %State{ group_name: group_name, task_supervisor_pid: task_supervisor_pid, - monitor_ref: monitor_ref + monitor_ref: monitor_ref, + global: true } = state ) do Swarm.leave(group_name, task_supervisor_pid) {:stop, :terminate, state} end + def handle_info( + {:DOWN, monitor_ref, :process, task_supervisor_pid, _reason}, + %State{ + task_supervisor_pid: task_supervisor_pid, + monitor_ref: monitor_ref, + global: false + } = state + ) do + {:stop, :terminate, state} + end + @doc false # Retrieve pids running the linked gen server def pids(server \\ __MODULE__) do diff --git a/lib/quantum/cluster_task_supervisor_registry/init_opts.ex b/lib/quantum/cluster_task_supervisor_registry/init_opts.ex index f87391b..a56c4e0 100644 --- a/lib/quantum/cluster_task_supervisor_registry/init_opts.ex +++ b/lib/quantum/cluster_task_supervisor_registry/init_opts.ex @@ -5,9 +5,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.InitOpts do @type t :: %__MODULE__{ task_supervisor_reference: GenServer.server(), - group_name: atom() + group_name: atom(), + global: boolean() } - @enforce_keys [:task_supervisor_reference, :group_name] + @enforce_keys [:task_supervisor_reference, :group_name, :global] defstruct @enforce_keys end diff --git a/lib/quantum/cluster_task_supervisor_registry/start_opts.ex b/lib/quantum/cluster_task_supervisor_registry/start_opts.ex index ddefe67..a08084a 100644 --- a/lib/quantum/cluster_task_supervisor_registry/start_opts.ex +++ b/lib/quantum/cluster_task_supervisor_registry/start_opts.ex @@ -6,9 +6,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.StartOpts do @type t :: %__MODULE__{ name: GenServer.server(), task_supervisor_reference: GenServer.server(), - group_name: atom() | nil + group_name: atom() | nil, + global: boolean() } - @enforce_keys [:name, :task_supervisor_reference] + @enforce_keys [:name, :task_supervisor_reference, :global] defstruct @enforce_keys ++ [:group_name] end diff --git a/lib/quantum/cluster_task_supervisor_registry/state.ex b/lib/quantum/cluster_task_supervisor_registry/state.ex index 4ac7744..a7dbadf 100644 --- a/lib/quantum/cluster_task_supervisor_registry/state.ex +++ b/lib/quantum/cluster_task_supervisor_registry/state.ex @@ -6,9 +6,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.State do @type t :: %__MODULE__{ group_name: atom(), task_supervisor_pid: GenServer.server(), - monitor_ref: reference + monitor_ref: reference, + global: boolean() } - @enforce_keys [:group_name, :task_supervisor_pid, :monitor_ref] + @enforce_keys [:group_name, :task_supervisor_pid, :monitor_ref, :global] defstruct @enforce_keys end diff --git a/lib/quantum/supervisor.ex b/lib/quantum/supervisor.ex index c29b47a..80fa600 100644 --- a/lib/quantum/supervisor.ex +++ b/lib/quantum/supervisor.ex @@ -57,7 +57,7 @@ defmodule Quantum.Supervisor do struct!( ClusterTaskSupervisorRegistryStartOpts, opts - |> Map.take([:task_supervisor_reference]) + |> Map.take([:task_supervisor_reference, :global]) |> Map.put(:name, cluster_task_supervisor_registry_name) ) diff --git a/test/quantum/cluster_task_supervisor_registry_test.exs b/test/quantum/cluster_task_supervisor_registry_test.exs index 3682622..f9d5aaa 100644 --- a/test/quantum/cluster_task_supervisor_registry_test.exs +++ b/test/quantum/cluster_task_supervisor_registry_test.exs @@ -6,53 +6,109 @@ defmodule Quantum.ClusterTaskSupervisorRegistryTest do alias Quantum.ClusterTaskSupervisorRegistry alias Quantum.ClusterTaskSupervisorRegistry.StartOpts - test "should register name", %{test: test} do - {:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test}) - - {:ok, registry_pid} = - start_supervised( - {ClusterTaskSupervisorRegistry, - %StartOpts{ - name: Module.concat([__MODULE__, test, Registry]), - task_supervisor_reference: test, - group_name: Module.concat([__MODULE__, test, Group]) - }} - ) - - Process.sleep(5_000) - - registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid) - registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid) - - assert Enum.count(registered_pids) == 1 - assert Enum.member?(registered_pids, task_supervisor_pid) - assert Enum.count(registered_nodes) == 1 - assert Enum.member?(registered_nodes, Node.self()) + describe "global" do + test "should register name", %{test: test} do + {:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test}) + + {:ok, registry_pid} = + start_supervised( + {ClusterTaskSupervisorRegistry, + %StartOpts{ + name: Module.concat([__MODULE__, test, Registry]), + task_supervisor_reference: test, + group_name: Module.concat([__MODULE__, test, Group]), + global: true + }} + ) + + Process.sleep(5_000) + + registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid) + registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid) + + assert Enum.count(registered_pids) == 1 + assert Enum.member?(registered_pids, task_supervisor_pid) + assert Enum.count(registered_nodes) == 1 + assert Enum.member?(registered_nodes, Node.self()) + end + + test "should quit when task_supervisor quits", %{test: test} do + test_pid = self() + + spawn(fn -> + send(test_pid, Task.Supervisor.start_link(name: test)) + + send( + test_pid, + ClusterTaskSupervisorRegistry.start_link(%StartOpts{ + name: Module.concat([__MODULE__, test, Registry]), + task_supervisor_reference: test, + group_name: Module.concat([__MODULE__, test, Group]), + global: true + }) + ) + end) + + assert_receive {:ok, task_supervisor_pid}, 10_000 + assert_receive {:ok, registry_pid}, 10_000 + + ref = Process.monitor(registry_pid) + + Process.exit(task_supervisor_pid, :kill) + + assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate} + end end - test "should quit when task_supervisor quits", %{test: test} do - test_pid = self() + describe "local" do + test "should register name", %{test: test} do + {:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test}) + + {:ok, registry_pid} = + start_supervised( + {ClusterTaskSupervisorRegistry, + %StartOpts{ + name: Module.concat([__MODULE__, test, Registry]), + task_supervisor_reference: test, + group_name: Module.concat([__MODULE__, test, Group]), + global: false + }} + ) + + registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid) + registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid) + + assert Enum.count(registered_pids) == 1 + assert Enum.member?(registered_pids, task_supervisor_pid) + assert Enum.count(registered_nodes) == 1 + assert Enum.member?(registered_nodes, Node.self()) + end + + test "should quit when task_supervisor quits", %{test: test} do + test_pid = self() - spawn(fn -> - send(test_pid, Task.Supervisor.start_link(name: test)) + spawn(fn -> + send(test_pid, Task.Supervisor.start_link(name: test)) - send( - test_pid, - ClusterTaskSupervisorRegistry.start_link(%StartOpts{ - name: Module.concat([__MODULE__, test, Registry]), - task_supervisor_reference: test, - group_name: Module.concat([__MODULE__, test, Group]) - }) - ) - end) + send( + test_pid, + ClusterTaskSupervisorRegistry.start_link(%StartOpts{ + name: Module.concat([__MODULE__, test, Registry]), + task_supervisor_reference: test, + group_name: Module.concat([__MODULE__, test, Group]), + global: false + }) + ) + end) - assert_receive {:ok, task_supervisor_pid}, 10_000 - assert_receive {:ok, registry_pid}, 10_000 + assert_receive {:ok, task_supervisor_pid}, 10_000 + assert_receive {:ok, registry_pid}, 10_000 - ref = Process.monitor(registry_pid) + ref = Process.monitor(registry_pid) - Process.exit(task_supervisor_pid, :kill) + Process.exit(task_supervisor_pid, :kill) - assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate} + assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate} + end end end