From 816a512980db2135ddaf8f63d493d9c49f2c3b35 Mon Sep 17 00:00:00 2001 From: Eric Oestrich Date: Tue, 16 Jan 2024 16:07:19 -0500 Subject: [PATCH] Changes to how DeviceLinks are supervised - Partition supervisor for the DynamicSupervisor, with thousands of devices connected the single DynamicSupervisor could get overloaded, partition it to help reduce processing changing state quickly - Start a DeviceLink with the ID only, to help reduce state inside the supervisor, only store the device's ID in the supervisor state (since it's part of the child_spec that's stored in the supervisor state) - Remove the supervisor module to simplify, put the one function that matters in the DeviceLink module directly - Set to transient for restart since we do want the DeviceLink to terminate and get cleared out of the child_spec state --- lib/nerves_hub/application.ex | 3 +- lib/nerves_hub/devices/device_link.ex | 32 +++++++++++++++----- lib/nerves_hub/devices/supervisor.ex | 24 --------------- test/nerves_hub/devices/device_link_test.exs | 22 +++++++++----- 4 files changed, 40 insertions(+), 41 deletions(-) delete mode 100644 lib/nerves_hub/devices/supervisor.ex diff --git a/lib/nerves_hub/application.ex b/lib/nerves_hub/application.ex index deb7360e2..7c1a64b95 100644 --- a/lib/nerves_hub/application.ex +++ b/lib/nerves_hub/application.ex @@ -34,7 +34,8 @@ defmodule NervesHub.Application do {Task.Supervisor, name: NervesHub.TaskSupervisor}, {Oban, Application.fetch_env!(:nerves_hub, Oban)}, NervesHub.Tracker, - NervesHub.Devices.Supervisor + {PartitionSupervisor, + child_spec: DynamicSupervisor, name: NervesHub.Devices.Supervisors} ] ++ deployments_supervisor(deploy_env()) ++ endpoints(deploy_env()) diff --git a/lib/nerves_hub/devices/device_link.ex b/lib/nerves_hub/devices/device_link.ex index b135b6841..a325d91df 100644 --- a/lib/nerves_hub/devices/device_link.ex +++ b/lib/nerves_hub/devices/device_link.ex @@ -6,7 +6,7 @@ defmodule NervesHub.Devices.DeviceLink do e.g. websockets, MQTT, etc """ - use GenServer + use GenServer, restart: :transient alias NervesHub.Archives alias NervesHub.AuditLogs @@ -35,8 +35,8 @@ defmodule NervesHub.Devices.DeviceLink do end @spec start_link(Device.t()) :: GenServer.on_start() - def start_link(device) do - GenServer.start_link(__MODULE__, device, name: name(device)) + def start_link(device_id) do + GenServer.start_link(__MODULE__, device_id, name: name(device_id)) end @spec name(Device.t() | pos_integer()) :: @@ -95,7 +95,7 @@ defmodule NervesHub.Devices.DeviceLink do link = case whereis(device) do nil -> - {:ok, pid} = NervesHub.Devices.Supervisor.start_device(device) + {:ok, pid} = start_device(device) pid link -> @@ -118,6 +118,19 @@ defmodule NervesHub.Devices.DeviceLink do GenServer.call(link, {:connect, push_cb, params, monitor, :ctx}) end + defp start_device(device) do + case GenServer.whereis(name(device)) do + nil -> + DynamicSupervisor.start_child( + {:via, PartitionSupervisor, {NervesHub.Devices.Supervisors, self()}}, + {__MODULE__, device.id} + ) + + pid when is_pid(pid) -> + {:ok, pid} + end + end + @doc """ Mark device as disconnected @@ -137,12 +150,14 @@ defmodule NervesHub.Devices.DeviceLink do end @impl GenServer - def init(device) do - {:ok, %State{device: device}, {:continue, :boot}} + def init(device_id) do + {:ok, %State{}, {:continue, {:boot, device_id}}} end @impl GenServer - def handle_continue(:boot, %{device: device} = state) do + def handle_continue({:boot, device_id}, state) do + device = Devices.get_device(device_id) + ref_id = Base.encode32(:crypto.strong_rand_bytes(2), padding: false) deployment_channel = @@ -163,7 +178,8 @@ defmodule NervesHub.Devices.DeviceLink do updating: false }) - {:noreply, %{state | deployment_channel: deployment_channel, reference_id: ref_id}} + {:noreply, + %{state | device: device, deployment_channel: deployment_channel, reference_id: ref_id}} end @impl GenServer diff --git a/lib/nerves_hub/devices/supervisor.ex b/lib/nerves_hub/devices/supervisor.ex deleted file mode 100644 index 85a3edb72..000000000 --- a/lib/nerves_hub/devices/supervisor.ex +++ /dev/null @@ -1,24 +0,0 @@ -defmodule NervesHub.Devices.Supervisor do - use DynamicSupervisor - - alias NervesHub.Devices.DeviceLink - - def start_link(_) do - DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) - end - - def start_device(device) do - case GenServer.whereis(DeviceLink.name(device)) do - nil -> - DynamicSupervisor.start_child(__MODULE__, {DeviceLink, device}) - - pid when is_pid(pid) -> - {:ok, pid} - end - end - - @impl true - def init(_) do - DynamicSupervisor.init(strategy: :one_for_one) - end -end diff --git a/test/nerves_hub/devices/device_link_test.exs b/test/nerves_hub/devices/device_link_test.exs index 72a7af16f..4d5afd4b3 100644 --- a/test/nerves_hub/devices/device_link_test.exs +++ b/test/nerves_hub/devices/device_link_test.exs @@ -10,16 +10,22 @@ defmodule NervesHub.DeviceLinkTest do alias NervesHub.Tracker alias Phoenix.Socket.Broadcast - test "device without deployment subscribes deployment:none" do - state = %DeviceLink.State{device: %Device{id: 1, deployment_id: nil}} - assert {:noreply, updated} = DeviceLink.handle_continue(:boot, state) + test "device without deployment subscribes deployment:none", context do + %{device: %{id: id}} = create_device(context) + assert {:noreply, updated} = DeviceLink.handle_continue({:boot, id}, %DeviceLink.State{}) assert updated.deployment_channel == "deployment:none" end - test "device with deployment subscribes deployment:\#{id}" do - state = %DeviceLink.State{device: %Device{id: 1, deployment_id: 1}} - assert {:noreply, updated} = DeviceLink.handle_continue(:boot, state) - assert updated.deployment_channel == "deployment:1" + test "device with deployment subscribes deployment:\#{id}", context do + %{device: device, deployment: deployment} = create_device(context) + + device = + device + |> Ecto.Changeset.change(%{deployment_id: deployment.id}) + |> NervesHub.Repo.update!() + + assert {:noreply, updated} = DeviceLink.handle_continue({:boot, device.id}, %DeviceLink.State{}) + assert updated.deployment_channel == "deployment:#{deployment.id}" end describe "connect/4" do @@ -531,7 +537,7 @@ defmodule NervesHub.DeviceLinkTest do end defp start_device_link(context) do - link = start_supervised!({DeviceLink, context.device}, restart: :temporary) + link = start_supervised!({DeviceLink, context.device.id}, restart: :temporary) Mox.allow(NervesHub.UploadMock, self(), link) Map.put(context, :link, link) end