Skip to content

Commit

Permalink
Cleanly close the device connection during channel tests (#1902)
Browse files Browse the repository at this point in the history
Postgres was complaining. This makes sure it doesn't happen anymore.
  • Loading branch information
joshk authored Feb 13, 2025
1 parent 10680c4 commit d49be06
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 64 deletions.
5 changes: 5 additions & 0 deletions lib/nerves_hub/audit_logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ defmodule NervesHub.AuditLogs do

{:ok, count}
end

# used in some tests
def with_description(desc) do
where(AuditLog, [a], like(a.description, ^desc))
end
end
128 changes: 78 additions & 50 deletions test/nerves_hub_web/channels/device_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ defmodule NervesHubWeb.DeviceChannelTest do
use NervesHubWeb.ChannelCase
use DefaultMocks

import Ecto.Query
import TrackerHelper

alias NervesHub.AuditLogs.AuditLog
alias NervesHub.AuditLogs
alias NervesHub.Devices
alias NervesHub.Fixtures
alias NervesHub.Repo
Expand All @@ -21,16 +20,19 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _, socket} =
{:ok, _, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device", %{"device_api_version" => "2.2.0"})

assert_push("extensions:get", _)

{:ok, _, _socket} =
{:ok, _, _extensions_channel} =
subscribe_and_join(socket, ExtensionsChannel, "extensions", %{
"geo" => "1.0.0",
"health" => "1.0.0"
})

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "presence connection information" do
Expand All @@ -43,9 +45,10 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _, _socket} = subscribe_and_join(socket, DeviceChannel, "device")
{:ok, _, device_channel} = subscribe_and_join(socket, DeviceChannel, "device")

assert_connection_change()
assert_online_and_available(device)
close_cleanly(device_channel)
end

test "fwup_public_keys requested on connect" do
Expand All @@ -66,9 +69,12 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, %{}, _socket} = subscribe_and_join(socket, DeviceChannel, "device", params)
{:ok, %{}, device_channel} = subscribe_and_join(socket, DeviceChannel, "device", params)

assert_push("fwup_public_keys", %{keys: [_]})

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "archive_public_keys requested on connect" do
Expand All @@ -89,25 +95,33 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, %{}, _socket} = subscribe_and_join(socket, DeviceChannel, "device", params)
{:ok, %{}, device_channel} = subscribe_and_join(socket, DeviceChannel, "device", params)

assert_push("archive_public_keys", %{keys: [_]})

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "if archive is sent on connect an audit log is not created" do
%{certificate: certificate, params: params, archive_uuid: archive_uuid} =
%{device: device, certificate: certificate, params: params, archive_uuid: archive_uuid} =
archive_setup()

{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, %{}, _socket} = subscribe_and_join(socket, DeviceChannel, "device", params)
audit_log_count_before =
Repo.aggregate(AuditLogs.with_description("Archive update triggered%"), :count)

audit_log_count_before = Repo.aggregate(AuditLog, :count)
{:ok, %{}, device_channel} = subscribe_and_join(socket, DeviceChannel, "device", params)

assert_push("archive", %{uuid: ^archive_uuid})

assert audit_log_count_before == Repo.aggregate(AuditLog, :count)
assert audit_log_count_before ==
Repo.aggregate(AuditLogs.with_description("Archive update triggered%"), :count)

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "if archive is sent when an archive updates an audit log is created" do
Expand All @@ -116,21 +130,20 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, %{}, socket} = subscribe_and_join(socket, DeviceChannel, "device", params)
{:ok, %{}, device_channel} = subscribe_and_join(socket, DeviceChannel, "device", params)

Phoenix.PubSub.broadcast(
NervesHub.PubSub,
"device:#{device.id}",
%Phoenix.Socket.Broadcast{event: "archives/updated"}
)

_ = :sys.get_state(socket.channel_pid)
_ = :sys.get_state(device_channel.channel_pid)

assert Repo.exists?(
from(al in AuditLog,
where: like(al.description, "Archive update triggered for%")
)
)
assert Repo.exists?(AuditLogs.with_description("Archive update triggered for%"))

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "if archive is sent when a device updates an audit log is created" do
Expand All @@ -139,21 +152,20 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, %{}, socket} = subscribe_and_join(socket, DeviceChannel, "device", params)
{:ok, %{}, device_channel} = subscribe_and_join(socket, DeviceChannel, "device", params)

Phoenix.PubSub.broadcast(
NervesHub.PubSub,
"device:#{device.id}",
%Phoenix.Socket.Broadcast{event: "devices/updated"}
)

_ = :sys.get_state(socket.channel_pid)
_ = :sys.get_state(device_channel.channel_pid)

assert Repo.exists?(AuditLogs.with_description("Archive update triggered for%"))

assert Repo.exists?(
from(al in AuditLog,
where: like(al.description, "Archive update triggered for%")
)
)
assert_online_and_available(device)
close_cleanly(device_channel)
end

test "the first fwup_progress marks an update as happening" do
Expand All @@ -164,15 +176,19 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

push(socket, "fwup_progress", %{"value" => 10})
assert_online_and_available(device)

push(device_channel, "fwup_progress", %{"value" => 10})

# Since fwup_progress doesn't reply, we need to use sys to grab the socket
# _after_ the handle_in has run
socket = :sys.get_state(socket.channel_pid)
assert socket.assigns.update_started?
state = :sys.get_state(device_channel.channel_pid)
assert state.assigns.update_started?

close_cleanly(device_channel)
end

test "set connection types for the device" do
Expand All @@ -183,17 +199,19 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

push(socket, "connection_types", %{"values" => ["ethernet", "wifi"]})
push(device_channel, "connection_types", %{"values" => ["ethernet", "wifi"]})

# we need to let the channel process all messages before we can
# check the state of the device's connection types
_socket = :sys.get_state(socket.channel_pid)
_socket = :sys.get_state(device_channel.channel_pid)

device = NervesHub.Repo.reload(device) |> NervesHub.Repo.preload(:latest_connection)
assert device.latest_connection.metadata["connection_types"] == ["ethernet", "wifi"]

close_cleanly(device_channel)
end

test "deployment information is updated when the deployment is cleared" do
Expand All @@ -206,20 +224,24 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

refute is_nil(socket.assigns.device.deployment_id)
refute is_nil(socket.assigns.deployment_channel)
assert_online_and_available(device)

refute is_nil(device_channel.assigns.device.deployment_id)
refute is_nil(device_channel.assigns.deployment_channel)

Devices.clear_deployment(device)

# we need to let the channel process all messages before we can
# check the state of the device's connection types
socket = :sys.get_state(socket.channel_pid)
state = :sys.get_state(device_channel.channel_pid)

assert is_nil(state.assigns.device.deployment_id)
assert is_nil(state.assigns.deployment_channel)

assert is_nil(socket.assigns.device.deployment_id)
assert is_nil(socket.assigns.deployment_channel)
close_cleanly(device_channel)
end

test "deployment information is updated when the device joins a new deployment" do
Expand All @@ -232,11 +254,11 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

assert socket.assigns.device.deployment_id == deployment.id
refute is_nil(socket.assigns.deployment_channel)
assert device_channel.assigns.device.deployment_id == deployment.id
refute is_nil(device_channel.assigns.deployment_channel)

device = NervesHub.Repo.preload(device, :org)

Expand All @@ -247,10 +269,12 @@ defmodule NervesHubWeb.DeviceChannelTest do

# we need to let the channel process all messages before we can
# check the state of the device's connection types
socket = :sys.get_state(socket.channel_pid)
state = :sys.get_state(device_channel.channel_pid)

assert socket.assigns.device.deployment_id == new_deployment.id
refute is_nil(socket.assigns.deployment_channel)
assert state.assigns.device.deployment_id == new_deployment.id
refute is_nil(state.assigns.deployment_channel)

close_cleanly(device_channel)
end

describe "unhandled messages are caught" do
Expand All @@ -264,12 +288,14 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

send(socket.channel_pid, {"do_you_like_dem_apples", %{"apples" => 5}})
assert_online_and_available(device)

send(device_channel.channel_pid, {"do_you_like_dem_apples", %{"apples" => 5}})

assert_connection_change()
close_cleanly(device_channel)
end

test "handle_in" do
Expand All @@ -280,11 +306,13 @@ defmodule NervesHubWeb.DeviceChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _join_reply, socket} =
{:ok, _join_reply, device_channel} =
subscribe_and_join(socket, DeviceChannel, "device")

ref = push(socket, "do_you_like_dem_apples", %{"apples" => 5})
ref = push(device_channel, "do_you_like_dem_apples", %{"apples" => 5})
refute_reply(ref, %{})

close_cleanly(device_channel)
end
end

Expand Down
17 changes: 10 additions & 7 deletions test/nerves_hub_web/channels/extensions_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ defmodule NervesHubWeb.ExtensionsChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _, _} =
{:ok, _, device_channel} =
subscribe_and_join_with_default_device_api_version(socket, DeviceChannel, "device")

assert_push("extensions:get", _extensions)

assert_online_and_available(device)
close_cleanly(device_channel)
end

test "joining extensions channel works when the device has connected for the first time" do
Expand Down Expand Up @@ -84,12 +87,12 @@ defmodule NervesHubWeb.ExtensionsChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _, socket} =
{:ok, _, _device_channel} =
subscribe_and_join_with_default_device_api_version(socket, DeviceChannel, "device")

assert_push("extensions:get", _extensions)

assert {:ok, attach_list, _} =
assert {:ok, attach_list, _extensions_channel} =
subscribe_and_join_with_default_device_api_version(
socket,
ExtensionsChannel,
Expand All @@ -112,12 +115,12 @@ defmodule NervesHubWeb.ExtensionsChannelTest do
{:ok, socket} =
connect(DeviceSocket, %{}, connect_info: %{peer_data: %{ssl_cert: certificate.der}})

{:ok, _, socket} =
{:ok, _, _device_channel} =
subscribe_and_join_with_default_device_api_version(socket, DeviceChannel, "device")

assert_push("extensions:get", _extensions)

assert {:ok, ["health"], _} =
assert {:ok, ["health"], _extensions_channel} =
subscribe_and_join_with_default_device_api_version(
socket,
ExtensionsChannel,
Expand Down Expand Up @@ -157,12 +160,12 @@ defmodule NervesHubWeb.ExtensionsChannelTest do
"nerves_fw_platform" => "test_host"
}

{:ok, _, socket} =
{:ok, _, _device_channel} =
subscribe_and_join_with_default_device_api_version(socket, DeviceChannel, "device", params)

assert_push("extensions:get", _extensions)

assert {:ok, attach_list, _} =
assert {:ok, attach_list, _extensions_channel} =
subscribe_and_join_with_default_device_api_version(
socket,
ExtensionsChannel,
Expand Down
7 changes: 0 additions & 7 deletions test/nerves_hub_web/channels/websocket_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule NervesHubWeb.WebsocketTest do
use NervesHubWeb.ChannelCase

use AssertEventually, timeout: 500, interval: 50

import TrackerHelper

alias NervesHub.AuditLogs
Expand Down Expand Up @@ -1185,9 +1183,4 @@ defmodule NervesHubWeb.WebsocketTest do
eventually assert_connection_change()
eventually(assert(Repo.all(where(DeviceConnection, status: :connected)) == []))
end

def assert_online_and_available(device) do
eventually assert [{_, %{}}] =
Registry.match(NervesHub.Devices.Registry, device.id, :_)
end
end
Loading

0 comments on commit d49be06

Please sign in to comment.