Skip to content

Commit

Permalink
Merge branch 'main' into alt-device-auth-jk-twist
Browse files Browse the repository at this point in the history
  • Loading branch information
joshk authored Dec 18, 2023
2 parents b8060e1 + dde177c commit 86f2dc3
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 462 deletions.
5 changes: 0 additions & 5 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ config :nerves_hub, NervesHubWeb.Endpoint,
render_errors: [view: NervesHubWeb.ErrorView, accepts: ~w(html json)],
pubsub_server: NervesHub.PubSub

config :opentelemetry,
span_processor: :batch,
traces_exporter: :otlp,
resource: %{service: %{name: "nerves_hub"}}

config :swoosh, :api_client, Swoosh.ApiClient.Finch

# Environment specific config
Expand Down
8 changes: 0 additions & 8 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,4 @@ config :nerves_hub, NervesHubWeb.Endpoint,

config :nerves_hub, NervesHubWeb.DeviceSocketSharedSecretAuth, enabled: true

# OTel
config :opentelemetry, tracer: :otel_tracer_noop, traces_exporter: :none

config :opentelemetry, :processors,
otel_batch_processor: %{
exporter: {:otel_exporter_tab, []}
}

config :sentry, environment_name: :test
57 changes: 24 additions & 33 deletions lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule NervesHub.Deployments do
import Ecto.Query

require Logger
require OpenTelemetry.Tracer, as: Tracer

alias NervesHub.AuditLogs
alias NervesHub.Deployments.Deployment
Expand Down Expand Up @@ -305,41 +304,33 @@ defmodule NervesHub.Deployments do
Do nothing if a deployment is already set
"""
def set_deployment(%{deployment_id: nil} = device) do
Tracer.with_span "Deployments.set_deployment" do
Tracer.set_attribute("nerves_hub.deployment.status", "setting")

case alternate_deployments(device, [true]) do
[] ->
Logger.debug("No matching deployments for #{device.identifier}")

%{device | deployment: nil}

[deployment] ->
device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, deployment.id)
|> Repo.update!()
|> Repo.preload([:deployment])

[deployment | _] ->
Logger.debug(
"More than one deployment matches for #{device.identifier}, setting to the first"
)

device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, deployment.id)
|> Repo.update!()
|> Repo.preload([:deployment])
end
case alternate_deployments(device, [true]) do
[] ->
Logger.debug("No matching deployments for #{device.identifier}")

%{device | deployment: nil}

[deployment] ->
device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, deployment.id)
|> Repo.update!()
|> Repo.preload([:deployment])

[deployment | _] ->
Logger.debug(
"More than one deployment matches for #{device.identifier}, setting to the first"
)

device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, deployment.id)
|> Repo.update!()
|> Repo.preload([:deployment])
end
end

def set_deployment(device) do
Tracer.with_span "Deployments.set_deployment" do
Tracer.set_attribute("nerves_hub.deployment.status", "existed")

Repo.preload(device, [:deployment])
end
Repo.preload(device, [:deployment])
end
end
112 changes: 50 additions & 62 deletions lib/nerves_hub/deployments/orchestrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule NervesHub.Deployments.Orchestrator do
use GenServer

require Logger
require OpenTelemetry.Tracer, as: Tracer

alias NervesHub.Devices
alias NervesHub.Devices.Device
Expand Down Expand Up @@ -50,68 +49,57 @@ defmodule NervesHub.Deployments.Orchestrator do
was successful, and the process is repeated.
"""
def trigger_update(deployment) do
Tracer.with_span "Deploments.Orchestrator.trigger_update" do
:telemetry.execute([:nerves_hub, :deployment, :trigger_update], %{count: 1})

match_conditions = [
{:and, {:==, {:map_get, :deployment_id, :"$1"}, deployment.id},
{:==, {:map_get, :updating, :"$1"}, false},
{:==, {:map_get, :updates_enabled, :"$1"}, true},
{:"/=", {:map_get, :firmware_uuid, :"$1"}, deployment.firmware.uuid}}
]

match_return = %{
device_id: {:element, 1, :"$_"},
pid: {:element, 1, {:element, 2, :"$_"}},
firmware_uuid: {:map_get, :firmware_uuid, {:element, 2, {:element, 2, :"$_"}}}
}

devices =
Registry.select(NervesHub.Devices, [
{{:_, :_, :"$1"}, match_conditions, [match_return]}
])

# Get a rough count of devices to update
count = deployment.concurrent_updates - Devices.count_inflight_updates_for(deployment)
# Just in case inflight goes higher than concurrent, limit it to 0
count = max(count, 0)

Tracer.set_attributes(%{
"nerves_hub.deployments.devices_to_update" => count,
"nerves_hub.deployments.devices_online" => Enum.count(devices)
})

# use a reduce to bounce out early?
# limit the number of devices to 5 minutes / 500ms?

devices
|> Enum.take(count)
|> Enum.each(fn %{device_id: device_id, pid: pid} ->
Tracer.with_span "Deployments.Orchestrator.each_device" do
Tracer.set_attribute("nerves_hub.devices.id", device_id)

:telemetry.execute([:nerves_hub, :deployment, :trigger_update, :device], %{count: 1})

device = %Device{id: device_id}

# Check again because other nodes are processing at the same time
if Devices.count_inflight_updates_for(deployment) < deployment.concurrent_updates do
case Devices.told_to_update(device, deployment) do
{:ok, inflight_update} ->
send(pid, {"deployments/update", inflight_update})

:error ->
Logger.error(
"An inflight update could not be created or found for the device #{device.identifier} (#{device.id})"
)
end
end

# Slow the update a bit to allow for concurrent nodes
Process.sleep(500)
:telemetry.execute([:nerves_hub, :deployment, :trigger_update], %{count: 1})

match_conditions = [
{:and, {:==, {:map_get, :deployment_id, :"$1"}, deployment.id},
{:==, {:map_get, :updating, :"$1"}, false},
{:==, {:map_get, :updates_enabled, :"$1"}, true},
{:"/=", {:map_get, :firmware_uuid, :"$1"}, deployment.firmware.uuid}}
]

match_return = %{
device_id: {:element, 1, :"$_"},
pid: {:element, 1, {:element, 2, :"$_"}},
firmware_uuid: {:map_get, :firmware_uuid, {:element, 2, {:element, 2, :"$_"}}}
}

devices =
Registry.select(NervesHub.Devices, [
{{:_, :_, :"$1"}, match_conditions, [match_return]}
])

# Get a rough count of devices to update
count = deployment.concurrent_updates - Devices.count_inflight_updates_for(deployment)
# Just in case inflight goes higher than concurrent, limit it to 0
count = max(count, 0)

# use a reduce to bounce out early?
# limit the number of devices to 5 minutes / 500ms?

devices
|> Enum.take(count)
|> Enum.each(fn %{device_id: device_id, pid: pid} ->
:telemetry.execute([:nerves_hub, :deployment, :trigger_update, :device], %{count: 1})

device = %Device{id: device_id}

# Check again because other nodes are processing at the same time
if Devices.count_inflight_updates_for(deployment) < deployment.concurrent_updates do
case Devices.told_to_update(device, deployment) do
{:ok, inflight_update} ->
send(pid, {"deployments/update", inflight_update})

:error ->
Logger.error(
"An inflight update could not be created or found for the device #{device.identifier} (#{device.id})"
)
end
end)
end
end

# Slow the update a bit to allow for concurrent nodes
Process.sleep(500)
end)
end

def init(deployment) do
Expand Down
34 changes: 10 additions & 24 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ defmodule NervesHub.Devices do
alias NervesHub.Repo
alias NervesHub.TaskSupervisor, as: Tasks

require OpenTelemetry.Tracer, as: Tracer

@min_fwup_delta_updatable_version ">=1.6.0"

def get_device!(device_id) do
Expand Down Expand Up @@ -623,33 +621,21 @@ defmodule NervesHub.Devices do
This may clear the deployment from the device if the version or tags are different.
"""
def verify_deployment(%{deployment_id: nil} = device) do
Tracer.with_span "Devices.verify_deployment" do
Tracer.set_attribute("nerves_hub.device.deployment_id", nil)

device
end
device
end

def verify_deployment(device) do
Tracer.with_span "Devices.verify_deployment" do
Tracer.set_attribute("nerves_hub.device.deployment_id", device.deployment_id)

device = Repo.preload(device, [:deployment])
device = Repo.preload(device, [:deployment])

case matches_deployment?(device, device.deployment) do
true ->
Tracer.set_attribute("nerves_hub.device.match_deployment", true)

device

false ->
Tracer.set_attribute("nerves_hub.device.match_deployment", false)
case matches_deployment?(device, device.deployment) do
true ->
device

device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, nil)
|> Repo.update!()
end
false ->
device
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(:deployment_id, nil)
|> Repo.update!()
end
end

Expand Down
Loading

0 comments on commit 86f2dc3

Please sign in to comment.