Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structuration import GTFS en batch job + dashboard de reporting #2920

Merged
merged 23 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions apps/transport/lib/db/data_import_batch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule DB.DataImportBatch do
@moduledoc """
Table storing the summary of a data import consolidation.
"""
use Ecto.Schema
use TypedEctoSchema

typed_schema "data_import_batch" do
field(:summary, :map)
thbar marked this conversation as resolved.
Show resolved Hide resolved
timestamps(type: :utc_datetime_usec)
end
end
65 changes: 65 additions & 0 deletions apps/transport/lib/jobs/gtfs_import_stops_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Transport.Jobs.GTFSImportStopsJob do
@moduledoc """
A job to import all stops from all active GTFS, and report on that.
"""
use Oban.Worker, max_attempts: 1
import Ecto.Query
require Logger

@impl Oban.Worker
def perform(%Oban.Job{} = job) do
# NOTE: at some point deleting DataImport not referenced by active datasets item will be a good idea,
# to avoid leaving obsolete stuff in the database.
result = {:ok, refresh_all()}
Oban.Notifier.notify(Oban, :gossip, %{complete: job.id})
result
end

def refresh_all do
result =
active_datasets_resource_history_items()
|> refresh()

batch =
%DB.DataImportBatch{summary: %{result: result}}
|> DB.Repo.insert!()

%{result: result, data_import_batch_id: batch.id}
end

def active_datasets_resource_history_items do
DB.Dataset.base_query()
|> DB.Resource.join_dataset_with_resource()
|> DB.ResourceHistory.join_resource_with_latest_resource_history()
|> where([resource: r], r.format == "GTFS")
|> select([resource_history: rh], rh)
|> DB.Repo.all()
end

def refresh(resource_history_items) do
resource_history_items
|> Enum.map(fn rh ->
thbar marked this conversation as resolved.
Show resolved Hide resolved
Logger.info("Processing rh_id=#{rh.id}")

try do
data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(rh.id)
%{resource_history_id: rh.id, status: :ok, data_import_id: data_import_id}
rescue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En cas d'erreur, je constitue une map précise pour afficher des choses dans le backoffice et faire en sorte d'itérer sur les imports.

error ->
%{
resource_history_id: rh.id,
status: :error,
error: error |> inspect(),
error_message: safe_call(fn -> Map.get(error, :message) end, "unknown"),
error_struct: safe_call(fn -> error.__struct__ |> inspect end, "unknown")
}
end
end)
end

def safe_call(cb, default) do
cb.()
rescue
_ -> default
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule TransportWeb.Backoffice.DataImportBatchReportLive do
use Phoenix.LiveView
use Phoenix.HTML
import TransportWeb.Backoffice.JobsLive, only: [ensure_admin_auth_or_redirect: 3]
import Ecto.Query
import TransportWeb.Router.Helpers

@impl true
def mount(_params, %{"current_user" => current_user} = _session, socket) do
{:ok,
ensure_admin_auth_or_redirect(socket, current_user, fn socket ->
socket
|> assign_data()
|> assign(job_running: false)
end)}
end

def assign_data(socket) do
record = DB.Repo.one(from(x in DB.DataImportBatch, order_by: [desc: x.id], limit: 1))

if record do
result = record.summary["result"]

socket
|> assign(
result: sort(result),
stats: compute_stats(result),
last_updated_at: (record.inserted_at |> DateTime.truncate(:second) |> to_string()) <> " UTC"
)
else
socket
|> assign(
result: [],
stats: nil,
last_updated_at: nil
)
end
end

@doc """
Provide a default sort helping us group errors by similarity
"""
def sort(result) do
Enum.sort_by(result, fn item ->
[item["status"], item["error_struct"], item["error_message"]]
end)
end

@doc """
Provide a bit of stats to display a summary
"""
def compute_stats(result) do
result
|> Enum.group_by(fn x -> x["status"] end)
|> Enum.map_join(", ", fn {k, v} -> "#{k} : #{Enum.count(v)}" end)
end

@impl true
def handle_event("refresh", _, socket) do
send(self(), :enqueue_job)
{:noreply, socket |> assign(:job_running, true)}
end

@impl true
def handle_info(:enqueue_job, socket) do
:ok = Oban.Notifier.listen([:gossip])

%{id: job_id} =
%{}
|> Transport.Jobs.GTFSImportStopsJob.new()
|> Oban.insert!()

socket =
receive do
{:notification, :gossip, %{"complete" => ^job_id}} ->
socket
|> assign(:job_running, false)
|> assign_data()
end

Oban.Notifier.unlisten([:gossip])
{:noreply, socket}
end

# catch-all
def handle_info(_, socket) do
{:noreply, socket}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<section class="container pt-48 pb-48">
<h2>Rapport du dernier batch import GTFS</h2>
<div class="pb-24">
<%= if @last_updated_at do %>
<p>Mis à jour à <%= @last_updated_at %></p>
<% end %>
<button class="button" phx-click="refresh" disabled={@job_running}>
<%= if @job_running, do: "En cours...", else: "Rafraîchir" %>
</button>
<%= if @stats do %>
<p>Résumé : <%= @stats %></p>
<% end %>
</div>
<table class="table">
<thead>
<tr>
<th>Resource History Id</th>
<th>Status</th>
<th>Erreur</th>
<th>Erreur desc</th>
</tr>
</thead>
<%= for item <- @result do %>
<tr>
<td><%= item["resource_history_id"] %></td>
<td><%= item["status"] %></td>
<td><%= item["error_struct"] %></td>
<td><%= item["error_message"] %></td>
</tr>
<% end %>
</table>
</section>
<script defer type="text/javascript" src={static_path(@socket, "/js/app.js")}>
</script>
4 changes: 4 additions & 0 deletions apps/transport/lib/transport_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ defmodule TransportWeb.Router do

get("/import_aoms", PageController, :import_all_aoms)

live_session :data_import_batch_report, root_layout: {TransportWeb.LayoutView, :app} do
live("/batch-report", DataImportBatchReportLive)
end

scope "/datasets" do
get("/new", PageController, :new)
get("/:id/edit", PageController, :edit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<%= dgettext("backoffice", "Cache debug") %>
</a>

<a class="button" href={backoffice_live_path(@conn, TransportWeb.Backoffice.DataImportBatchReportLive)}>
thbar marked this conversation as resolved.
Show resolved Hide resolved
<%= dgettext("backoffice", "GTFS data import") %>
</a>

<h2>Actions</h2>

<a class="button" href={backoffice_page_path(@conn, :new)}>
Expand Down
4 changes: 4 additions & 0 deletions apps/transport/priv/gettext/backoffice.pot
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ msgstr ""
#, elixir-autogen, elixir-format
msgid "Cache debug"
msgstr ""

#, elixir-autogen, elixir-format
msgid "GTFS data import"
msgstr ""
4 changes: 4 additions & 0 deletions apps/transport/priv/gettext/en/LC_MESSAGES/backoffice.po
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ msgstr ""
#, elixir-autogen, elixir-format
msgid "Cache debug"
msgstr ""

#, elixir-autogen, elixir-format
msgid "GTFS data import"
msgstr ""
4 changes: 4 additions & 0 deletions apps/transport/priv/gettext/fr/LC_MESSAGES/backoffice.po
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ msgstr "adresse e-mail"
#, elixir-autogen, elixir-format
msgid "Cache debug"
msgstr "Debug du cache"

#, elixir-autogen, elixir-format
msgid "GTFS data import"
msgstr ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule DB.Repo.Migrations.CreateDataImportBatch do
use Ecto.Migration

def change do
create table("data_import_batch") do
add :summary, :map, default: %{}
timestamps(type: :utc_datetime_usec)
end
end
end
24 changes: 24 additions & 0 deletions apps/transport/test/support/file_stream_utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Transport.Test.FileStreamUtils do
@moduledoc """
Shared tools to mox unzip calls during tests.
"""
import ExUnit.Assertions
import Mox

def setup_get_file_stream_mox(zip_filename) do
# NOTE: it will be possible to reuse common code from Transport.Unzip.S3 in there
Transport.Unzip.S3.Mock
|> expect(:get_file_stream, fn file_in_zip, zip_file, bucket ->
# from payload
assert zip_file == zip_filename
# from config
assert bucket == "transport-data-gouv-fr-resource-history-test"

# stub with a local file
path = "#{__DIR__}/../fixture/files/gtfs_import.zip"
zip_file = Unzip.LocalFile.open(path)
{:ok, unzip} = Unzip.new(zip_file)
Unzip.file_stream!(unzip, file_in_zip)
end)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Transport.Test.Transport.Jobs.GTFSImportJobTest do
use ExUnit.Case, async: true
use Oban.Testing, repo: DB.Repo
import DB.Factory
import Mox
import Transport.Test.FileStreamUtils

setup :verify_on_exit!

setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

test "import without error" do
%{id: dataset_id} = insert(:dataset, %{datagouv_id: Ecto.UUID.generate(), datagouv_title: "coucou"})
%{id: resource_id} = insert(:resource, dataset_id: dataset_id, format: "GTFS")

%{id: resource_history_id} =
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-file.zip"}})

setup_get_file_stream_mox("some-file.zip")

{:ok, result} = perform_job(Transport.Jobs.GTFSImportStopsJob, %{})

%{
data_import_batch_id: data_import_batch_id,
result: [%{resource_history_id: ^resource_history_id}]
} = result

assert DB.Repo.get(DB.DataImportBatch, data_import_batch_id)
end
end
29 changes: 6 additions & 23 deletions apps/transport/test/transport/jobs/gtfs_import_stops_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Transport.Test.Transport.Jobs.GTFSImportStopsTest do
import DB.Factory
import Mox
import Ecto.Query

import Transport.Test.FileStreamUtils
setup :verify_on_exit!

setup do
Expand All @@ -15,50 +15,33 @@ defmodule Transport.Test.Transport.Jobs.GTFSImportStopsTest do
DB.Repo.all(from(di in DB.DataImport, select: di.id, order_by: [asc: di.id]))
end

def setup_mox(zip_filename) do
# NOTE: it will be possible to reuse common code from Transport.Unzip.S3 in there
Transport.Unzip.S3.Mock
|> expect(:get_file_stream, fn file_in_zip, zip_file, bucket ->
# from payload
assert zip_file == zip_filename
# from config
assert bucket == "transport-data-gouv-fr-resource-history-test"

# stub with a local file
path = "#{__DIR__}/../../fixture/files/gtfs_import.zip"
zip_file = Unzip.LocalFile.open(path)
{:ok, unzip} = Unzip.new(zip_file)
Unzip.file_stream!(unzip, file_in_zip)
end)
end

test "import stops" do
%{id: dataset_id} = insert(:dataset, %{datagouv_id: "xxx", datagouv_title: "coucou"})
%{id: resource_id} = insert(:resource, dataset_id: dataset_id)

%{id: resource_history_id} =
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-file.zip"}})

setup_mox("some-file.zip")
setup_get_file_stream_mox("some-file.zip")
assert data_import_ids() == []
first_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(resource_history_id)
assert data_import_ids() == [first_data_import_id]

# subsequent import must remove the previous import for same resource_history_id
setup_mox("some-file.zip")
setup_get_file_stream_mox("some-file.zip")
second_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(resource_history_id)
assert data_import_ids() == [second_data_import_id]

# subsequent import for a new resource_history_id on same resource should also remove previous imports
%{id: new_resource_history_id} =
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-new-file.zip"}})

setup_mox("some-new-file.zip")
setup_get_file_stream_mox("some-new-file.zip")
third_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(new_resource_history_id)
assert data_import_ids() == [third_data_import_id]

# other resources should not be impacted by import
setup_mox("some-other-file.zip")
setup_get_file_stream_mox("some-other-file.zip")
%{id: other_dataset_id} = insert(:dataset, %{datagouv_id: "yyy"})
%{id: other_resource_id} = insert(:resource, dataset_id: other_dataset_id)

Expand All @@ -72,7 +55,7 @@ defmodule Transport.Test.Transport.Jobs.GTFSImportStopsTest do
%{id: new_resource_history_id} =
insert(:resource_history, %{resource_id: resource_id, payload: %{"filename" => "some-new-file.zip"}})

setup_mox("some-new-file.zip")
setup_get_file_stream_mox("some-new-file.zip")
fourth_data_import_id = Transport.GTFSImportStops.import_stops_and_remove_previous(new_resource_history_id)
assert data_import_ids() == [other_data_import_id, fourth_data_import_id]
end
Expand Down
Loading