From ea32f578b94fd50655c752b8bc2df8458e77f1f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Tue, 10 Dec 2024 16:07:28 +0100 Subject: [PATCH 01/18] =?UTF-8?q?Registre=20d'arr=C3=AAts=20:=20premiers?= =?UTF-8?q?=20mod=C3=A8les?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See #4354. --- .../lib/registry/model/data_source.ex | 23 ++++++++ apps/transport/lib/registry/model/stop.ex | 57 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 apps/transport/lib/registry/model/data_source.ex create mode 100644 apps/transport/lib/registry/model/stop.ex diff --git a/apps/transport/lib/registry/model/data_source.ex b/apps/transport/lib/registry/model/data_source.ex new file mode 100644 index 0000000000..d985325ce9 --- /dev/null +++ b/apps/transport/lib/registry/model/data_source.ex @@ -0,0 +1,23 @@ +defmodule Transport.Registry.Model.DataSource do + @moduledoc """ + Common attributes describing a data source. + """ + + defstruct [ + :id, + :checksum, + :last_updated_at, + :validity_period + ] + + @type t :: %__MODULE__{ + id: data_source_id(), + checksum: binary(), + last_updated_at: DateTime.t(), + validity_period: date_time_range() + } + + @type data_source_id :: binary() + + @type date_time_range :: binary() +end diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex new file mode 100644 index 0000000000..50c069818f --- /dev/null +++ b/apps/transport/lib/registry/model/stop.ex @@ -0,0 +1,57 @@ +defmodule Transport.Registry.Model.StopIdentifier do + @moduledoc """ + Representation of a Stop ID. + """ + + defstruct [ + :id, + :type + ] + + @type t :: %__MODULE__{ + id: binary(), + type: identifier_type() + } + + @type identifier_type :: :main | :private_code | :stop_code | :other +end + +defmodule Transport.Registry.Model.Stop do + @moduledoc """ + Common attributes describing a stop. + """ + alias Transport.Registry.Model.DataSource + alias Transport.Registry.Model.StopIdentifier + + defstruct [ + :main_id, + :display_name, + :data_source_id, + :data_source_format, + :parent_id, + :latitude, + :longitude, + projection: :utm_wgs84, + stop_type: :stop, + secondary_ids: [] + ] + + @type t :: %__MODULE__{ + main_id: StopIdentifier.t(), + display_name: binary(), + data_source_id: DataSource.data_source_id(), + data_source_format: data_source_format_type(), + parent_id: StopIdentifier.t() | nil, + latitude: float(), + longitude: float(), + projection: projection(), + stop_type: stop_type(), + secondary_ids: [StopIdentifier.t()] + } + + @type data_source_format_type :: :gtfs | :netex + + @type stop_type :: :stop | :quay | :other + + @type projection :: :utm_wgs84 | :lambert93_rgf93 +end From 7a4331237c7980ff246599fb9f8a6cbdf9800e27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Wed, 11 Dec 2024 18:22:29 +0100 Subject: [PATCH 02/18] Extracteur GTFS --- apps/transport/lib/registry/extractor.ex | 7 ++ apps/transport/lib/registry/gtfs.ex | 88 ++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 apps/transport/lib/registry/extractor.ex create mode 100644 apps/transport/lib/registry/gtfs.ex diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex new file mode 100644 index 0000000000..b26eba4930 --- /dev/null +++ b/apps/transport/lib/registry/extractor.ex @@ -0,0 +1,7 @@ +defmodule Transport.Registry.Extractor do + alias Transport.Registry.Model.Stop + + @type result(positive) :: {:ok, positive} | {:error, binary()} + + @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) +end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex new file mode 100644 index 0000000000..c7cedcc8d1 --- /dev/null +++ b/apps/transport/lib/registry/gtfs.ex @@ -0,0 +1,88 @@ +defmodule Transport.Registry.GTFS do + alias Transport.Registry.Model.DataSource + alias Transport.Registry.Model.Stop + alias Transport.Registry.Model.StopIdentifier + + @behaviour Transport.Registry.Extractor + @doc """ + Extract stops from GTFS ressource. + """ + def extract_from_archive(archive) do + archive + |> file_stream!() + |> to_stream_of_maps() + |> Stream.map(fn r -> + %Stop{ + main_id: %StopIdentifier{id: Map.fetch!(r, "stop_id"), type: :main}, + display_name: Map.fetch!(r, "stop_name"), + latitude: Map.fetch!(r, "stop_lat") |> convert_text_to_float(), + longitude: Map.fetch!(r, "stop_lon") |> convert_text_to_float(), + projection: :utm_wgs84, + stop_type: r |> csv_get_with_default!("location_type", "0", false) |> to_stop_type() + } + end) + + {:ok, []} + end + + @doc """ + Transform the stream outputed by Unzip to a stream of maps, each map + corresponding to a row from the CSV. + """ + def to_stream_of_maps(file_stream) do + file_stream + # transform the stream to a stream of binaries + |> Stream.map(fn c -> IO.iodata_to_binary(c) end) + # stream line by line + |> NimbleCSV.RFC4180.to_line_stream() + |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) + # transform the stream to a stream of maps %{column_name1: value1, ...} + |> Stream.transform([], fn r, acc -> + if acc == [] do + {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} + else + {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} + end + end) + end + + @doc """ + Convert textual values to float. + + iex> convert_text_to_float("0") + 0.0 + iex> convert_text_to_float("0.0") + 0.0 + iex> convert_text_to_float("12.7") + 12.7 + iex> convert_text_to_float("-12.7") + -12.7 + iex> convert_text_to_float(" -48.7 ") + -48.7 + """ + def convert_text_to_float(input) do + input |> String.trim() |> Decimal.new() |> Decimal.to_float() + end + + defp to_stop_type("0"), do: :quay + defp to_stop_type("1"), do: :stop + defp to_stop_type(_), do: :other + + defp file_stream!(archive) do + zip_file = Unzip.LocalFile.open(archive) + + {:ok, unzip} = Unzip.new(zip_file) + + Unzip.file_stream!(unzip, "stops.txt") + end + + defp csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do + value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) + + case value do + nil -> default_value + "" -> default_value + v -> v + end + end +end From 0f17fd1b10cb17f7bb3c166a698fc4c975f3f42b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Tue, 17 Dec 2024 17:36:04 +0100 Subject: [PATCH 03/18] Transport.HTTPClient.get --- apps/shared/lib/wrapper/wrapper_req.ex | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/apps/shared/lib/wrapper/wrapper_req.ex b/apps/shared/lib/wrapper/wrapper_req.ex index 17a7e9c6ca..b8fbe93ce5 100644 --- a/apps/shared/lib/wrapper/wrapper_req.ex +++ b/apps/shared/lib/wrapper/wrapper_req.ex @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do """ def get!(url, options) do + {req, options} = setup_cache(options) + + Transport.Req.impl().get!(req, options |> Keyword.merge(url: url)) + end + + def get(url, options) do + {req, options} = setup_cache(options) + + Transport.Req.impl().get(req, options |> Keyword.merge(url: url)) + end + + defp setup_cache(options) do options = Keyword.validate!(options, [ :custom_cache_dir, @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do {enable_cache, options} = options |> Keyword.pop!(:enable_cache) - req = - if enable_cache do - req |> Transport.Shared.ReqCustomCache.attach() - else - req - end - - Transport.Req.impl().get!(req, options |> Keyword.merge(url: url)) + if enable_cache do + {req |> Transport.Shared.ReqCustomCache.attach(), options} + else + {req, options} + end end end From d897569e80a7277d86b048c9bbb1748b4f57a3b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Thu, 12 Dec 2024 13:42:43 +0100 Subject: [PATCH 04/18] =?UTF-8?q?Registre=20d'arr=C3=AAt=20:=20premier=20s?= =?UTF-8?q?cript=20d'export?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Supporte que les GTFS. --- apps/transport/lib/registry/engine.ex | 101 ++++++++++++++++++++++ apps/transport/lib/registry/extractor.ex | 17 ++++ apps/transport/lib/registry/gtfs.ex | 95 +++++++++++++++----- apps/transport/lib/registry/model/stop.ex | 36 ++++++++ scripts/registre-arrets.exs | 1 + 5 files changed, 229 insertions(+), 21 deletions(-) create mode 100644 apps/transport/lib/registry/engine.ex create mode 100644 scripts/registre-arrets.exs diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex new file mode 100644 index 0000000000..17198d3e9c --- /dev/null +++ b/apps/transport/lib/registry/engine.ex @@ -0,0 +1,101 @@ +defmodule Transport.Registry.Engine do + @moduledoc """ + Stream eligible resources and run extractors to produce a raw registry at the end. + """ + + alias Transport.Registry.Extractor + alias Transport.Registry.GTFS + alias Transport.Registry.Model.Stop + + import Ecto.Query + + require Logger + + @spec execute(output_file :: Path.t(), list()) :: :ok + def execute(output_file, opts \\ []) do + limit = Keyword.get(opts, :limit, 1_000_000) + formats = Keyword.get(opts, :formats, ~w(GTFS NeTEx)) + + create_empty_csv_with_headers(output_file) + + enumerate_gtfs_resources(limit, formats) + |> Extractor.traverse(&prepare_extractor/1) + |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) + # one for Task.async_stream + |> Extractor.keep_results() + # one for download/1 + |> Extractor.keep_results() + |> Extractor.traverse(&extract_from_archive/1) + |> dump_to_csv(output_file) + end + + def create_empty_csv_with_headers(output_file) do + headers = NimbleCSV.RFC4180.dump_to_iodata([Stop.csv_headers()]) + File.write(output_file, headers) + end + + def enumerate_gtfs_resources(limit, formats) do + DB.Resource.base_query() + |> DB.ResourceHistory.join_resource_with_latest_resource_history() + |> where([resource: r], r.format in ^formats) + |> preload([resource_history: rh], resource_history: rh) + |> limit(^limit) + |> DB.Repo.all() + end + + def prepare_extractor(%DB.Resource{} = resource) do + case resource.format do + "GTFS" -> {:ok, {GTFS, resource.url}} + _ -> {:error, "Unsupported format"} + end + end + + def download({extractor, url}) do + Logger.debug("download #{extractor} #{url}") + tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") + + error_result = fn msg -> + File.rm(tmp_path) + {:error, msg} + end + + http_result = + Transport.HTTPClient.get(url, + decode_body: false, + compressed: false, + into: File.stream!(tmp_path) + ) + + case http_result do + {:error, error} -> + error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + + {:ok, %{status: status}} -> + cond do + status >= 200 && status < 300 -> + {:ok, {extractor, tmp_path}} + + status > 400 -> + error_result.("Error #{status} while downloading the resource from #{url}") + + true -> + error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + end + end + end + + @spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()]) + def extract_from_archive({extractor, file}) do + Logger.debug("extract_from_archive #{extractor} #{file}") + extractor.extract_from_archive(file) + end + + def dump_to_csv(enumerable, output_file) do + enumerable + |> Stream.concat() + |> Stream.map(&Stop.to_csv/1) + |> NimbleCSV.RFC4180.dump_to_stream() + |> Stream.into(File.stream!(output_file, [:append, :utf8])) + |> Stream.run() + end +end diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index b26eba4930..a0f8a7c52d 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -1,7 +1,24 @@ defmodule Transport.Registry.Extractor do + @moduledoc """ + Interface and utilities for stops extractors. + """ + + require Logger + alias Transport.Registry.Model.Stop @type result(positive) :: {:ok, positive} | {:error, binary()} @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) + + def keep_results(enumerable), do: Stream.flat_map(enumerable, &keep_result/1) + + defp keep_result({:ok, result}), do: [result] + defp keep_result(_), do: [] + + def traverse(enumerable, mapper) do + enumerable + |> Stream.map(mapper) + |> keep_results() + end end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index c7cedcc8d1..9e3a3c000e 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -1,28 +1,34 @@ defmodule Transport.Registry.GTFS do - alias Transport.Registry.Model.DataSource + @moduledoc """ + Implementation of a stop extractor for GTFS resources. + """ + alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + require Logger + @behaviour Transport.Registry.Extractor @doc """ Extract stops from GTFS ressource. """ def extract_from_archive(archive) do - archive - |> file_stream!() - |> to_stream_of_maps() - |> Stream.map(fn r -> - %Stop{ - main_id: %StopIdentifier{id: Map.fetch!(r, "stop_id"), type: :main}, - display_name: Map.fetch!(r, "stop_name"), - latitude: Map.fetch!(r, "stop_lat") |> convert_text_to_float(), - longitude: Map.fetch!(r, "stop_lon") |> convert_text_to_float(), - projection: :utm_wgs84, - stop_type: r |> csv_get_with_default!("location_type", "0", false) |> to_stop_type() - } - end) + case file_stream(archive) do + {:error, error} -> + Logger.error(error) + {:error, error} + + {:ok, content} -> + Logger.debug("Valid Zip archive") - {:ok, []} + stops = + content + |> to_stream_of_maps() + |> Stream.flat_map(&handle_stop/1) + |> Enum.to_list() + + {:ok, stops} + end end @doc """ @@ -46,9 +52,35 @@ defmodule Transport.Registry.GTFS do end) end + defp handle_stop(record) do + latitude = fetch_position(record, "stop_lat") + longitude = fetch_position(record, "stop_lon") + + if latitude != nil && longitude != nil do + [ + %Stop{ + main_id: %StopIdentifier{id: Map.fetch!(record, "stop_id"), type: :main}, + display_name: Map.fetch!(record, "stop_name"), + latitude: latitude, + longitude: longitude, + projection: :utm_wgs84, + stop_type: record |> csv_get_with_default!("location_type", "0") |> to_stop_type() + } + ] + else + [] + end + end + + defp fetch_position(record, field) do + Map.fetch!(record, field) |> convert_text_to_float() + end + @doc """ Convert textual values to float. + iex> convert_text_to_float("") + nil iex> convert_text_to_float("0") 0.0 iex> convert_text_to_float("0.0") @@ -61,23 +93,44 @@ defmodule Transport.Registry.GTFS do -48.7 """ def convert_text_to_float(input) do - input |> String.trim() |> Decimal.new() |> Decimal.to_float() + if input |> String.trim() != "" do + input |> String.trim() |> Decimal.new() |> Decimal.to_float() + else + nil + end end defp to_stop_type("0"), do: :quay defp to_stop_type("1"), do: :stop defp to_stop_type(_), do: :other - defp file_stream!(archive) do + defp file_stream(archive) do zip_file = Unzip.LocalFile.open(archive) - {:ok, unzip} = Unzip.new(zip_file) + case Unzip.new(zip_file) do + {:ok, unzip} -> + if has_stops?(unzip) do + {:ok, Unzip.file_stream!(unzip, "stops.txt")} + else + {:error, "Missing stops.txt in #{archive}"} + end + + {:error, error} -> + {:error, "Error while unzipping archive #{archive}: #{error}"} + end + end + + defp has_stops?(unzip) do + Unzip.list_entries(unzip) + |> Enum.any?(&entry_of_name?("stops.txt", &1)) + end - Unzip.file_stream!(unzip, "stops.txt") + defp entry_of_name?(name, %Unzip.Entry{file_name: file_name}) do + file_name == name end - defp csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do - value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) + defp csv_get_with_default!(map, field, default_value) do + value = Map.get(map, field) case value do nil -> default_value diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex index 50c069818f..767283ab83 100644 --- a/apps/transport/lib/registry/model/stop.ex +++ b/apps/transport/lib/registry/model/stop.ex @@ -14,6 +14,10 @@ defmodule Transport.Registry.Model.StopIdentifier do } @type identifier_type :: :main | :private_code | :stop_code | :other + + def to_field(%__MODULE__{id: id, type: type}) do + "#{type}:#{id}" + end end defmodule Transport.Registry.Model.Stop do @@ -54,4 +58,36 @@ defmodule Transport.Registry.Model.Stop do @type stop_type :: :stop | :quay | :other @type projection :: :utm_wgs84 | :lambert93_rgf93 + + def csv_headers do + ~w( + main_id + display_name + data_source_id + data_source_format + parent_id + latitude + longitude + projection + stop_type + ) + end + + def to_csv(%__MODULE__{} = stop) do + [ + StopIdentifier.to_field(stop.main_id), + stop.display_name, + stop.data_source_id, + stop.data_source_format, + maybe(stop.parent_id, &StopIdentifier.to_field/1, ""), + stop.latitude, + stop.longitude, + stop.projection, + stop.stop_type + ] + end + + @spec maybe(value :: any() | nil, mapper :: (any() -> any()), defaultValue :: any()) :: any() | nil + def maybe(nil, _, defaultValue), do: defaultValue + def maybe(value, mapper, _), do: mapper.(value) end diff --git a/scripts/registre-arrets.exs b/scripts/registre-arrets.exs new file mode 100644 index 0000000000..d02e89cdce --- /dev/null +++ b/scripts/registre-arrets.exs @@ -0,0 +1 @@ +Transport.Registry.Engine.execute("./registre-arrets.csv") From 5c213182e8104f0c56aab6dd30c4ed000a14b155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Wed, 18 Dec 2024 18:40:42 +0100 Subject: [PATCH 05/18] Test de certains utilitaires --- apps/transport/lib/registry/extractor.ex | 2 ++ apps/transport/lib/registry/model/stop.ex | 8 ++++++ .../test/registry/extractor_test.exs | 28 +++++++++++++++++++ apps/transport/test/registry/gtfs_test.exs | 4 +++ apps/transport/test/registry/model_test.exs | 6 ++++ 5 files changed, 48 insertions(+) create mode 100644 apps/transport/test/registry/extractor_test.exs create mode 100644 apps/transport/test/registry/gtfs_test.exs create mode 100644 apps/transport/test/registry/model_test.exs diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index a0f8a7c52d..34ebb9e14d 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -11,11 +11,13 @@ defmodule Transport.Registry.Extractor do @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) + @spec keep_results(Stream.t(result(term()))) :: Stream.t(term()) def keep_results(enumerable), do: Stream.flat_map(enumerable, &keep_result/1) defp keep_result({:ok, result}), do: [result] defp keep_result(_), do: [] + @spec traverse(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) def traverse(enumerable, mapper) do enumerable |> Stream.map(mapper) diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex index 767283ab83..89a1076b17 100644 --- a/apps/transport/lib/registry/model/stop.ex +++ b/apps/transport/lib/registry/model/stop.ex @@ -15,6 +15,14 @@ defmodule Transport.Registry.Model.StopIdentifier do @type identifier_type :: :main | :private_code | :stop_code | :other + @doc """ + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "stop1", type: :main}) + "main:stop1" + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "FRPLY", type: :private_code}) + "private_code:FRPLY" + iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "PARIS GDL", type: :other}) + "other:PARIS GDL" + """ def to_field(%__MODULE__{id: id, type: type}) do "#{type}:#{id}" end diff --git a/apps/transport/test/registry/extractor_test.exs b/apps/transport/test/registry/extractor_test.exs new file mode 100644 index 0000000000..462196cea3 --- /dev/null +++ b/apps/transport/test/registry/extractor_test.exs @@ -0,0 +1,28 @@ +defmodule Transport.Registry.ExtractorTest do + use ExUnit.Case, async: false + + require Integer + alias Transport.Registry.Extractor + + test "keep_results" do + assert [] == keep_results([]) + assert [] == keep_results([{:error, "Error message"}]) + assert [1, 3] == keep_results([{:ok, 1}, {:error, "Error message"}, {:ok, 3}]) + end + + test "traverse" do + assert [] == traverse([], &even_is_forbidden/1) + assert [1, 3, 5, 7, 9] == traverse(1..10, &even_is_forbidden/1) + end + + defp keep_results(enumerable) do + enumerable |> Extractor.keep_results() |> Enum.to_list() + end + + defp traverse(enumerable, mapper) do + enumerable |> Extractor.traverse(mapper) |> Enum.to_list() + end + + defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i} + defp even_is_forbidden(_), do: {:error, "Even is forbidden"} +end diff --git a/apps/transport/test/registry/gtfs_test.exs b/apps/transport/test/registry/gtfs_test.exs new file mode 100644 index 0000000000..8102b43347 --- /dev/null +++ b/apps/transport/test/registry/gtfs_test.exs @@ -0,0 +1,4 @@ +defmodule Transport.Registry.GTFSTest do + use ExUnit.Case, async: true + doctest Transport.Registry.GTFS, import: true +end diff --git a/apps/transport/test/registry/model_test.exs b/apps/transport/test/registry/model_test.exs new file mode 100644 index 0000000000..23ff7b266a --- /dev/null +++ b/apps/transport/test/registry/model_test.exs @@ -0,0 +1,6 @@ +defmodule Transport.Registry.ModelTest do + use ExUnit.Case, async: true + doctest Transport.Registry.Model.DataSource, import: true + doctest Transport.Registry.Model.Stop, import: true + doctest Transport.Registry.Model.StopIdentifier, import: true +end From 44b667628ecfc01aab3419c985e6e3b18db6b8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Wed, 18 Dec 2024 19:02:16 +0100 Subject: [PATCH 06/18] More tests and refactoring Extract code shared with the `Transport.Jobs.GtfsToDB` job. --- apps/transport/lib/gtfs/utils.ex | 87 +++++++++++++++++++++++++ apps/transport/lib/jobs/gtfs_to_db.ex | 65 +++--------------- apps/transport/lib/registry/gtfs.ex | 69 ++------------------ apps/transport/test/gtfs/utils_test.exs | 4 ++ 4 files changed, 106 insertions(+), 119 deletions(-) create mode 100644 apps/transport/lib/gtfs/utils.ex create mode 100644 apps/transport/test/gtfs/utils_test.exs diff --git a/apps/transport/lib/gtfs/utils.ex b/apps/transport/lib/gtfs/utils.ex new file mode 100644 index 0000000000..27ddc49e7e --- /dev/null +++ b/apps/transport/lib/gtfs/utils.ex @@ -0,0 +1,87 @@ +defmodule Transport.GTFS.Utils do + @moduledoc """ + Some helpers for handling GTFS archives. + """ + + def fetch_position(record, field) do + Map.fetch!(record, field) |> convert_text_to_float() + end + + @doc """ + Convert textual values to float. + + iex> convert_text_to_float("") + nil + iex> convert_text_to_float("0") + 0.0 + iex> convert_text_to_float("0.0") + 0.0 + iex> convert_text_to_float("12.7") + 12.7 + iex> convert_text_to_float("-12.7") + -12.7 + iex> convert_text_to_float(" -48.7 ") + -48.7 + """ + def convert_text_to_float(input) do + if input |> String.trim() != "" do + input |> String.trim() |> Decimal.new() |> Decimal.to_float() + else + nil + end + end + + @doc """ + Variant of csv_get_with_default/3 that raises if a mandatory column is missing. + """ + def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do + value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) + + case value do + nil -> default_value + "" -> default_value + v -> v + end + end + + @doc """ + iex> csv_get_with_default(%{}, "field", 0) + 0 + iex> csv_get_with_default(%{"other_field" => 1}, "field", 0) + 0 + iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0) + 2 + iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0) + 0 + """ + def csv_get_with_default(map, field, default_value) do + value = Map.get(map, field) + + case value do + nil -> default_value + "" -> default_value + v -> v + end + end + + @doc """ + Transform the stream outputed by Unzip to a stream of maps, each map + corresponding to a row from the CSV. + """ + def to_stream_of_maps(file_stream) do + file_stream + # transform the stream to a stream of binaries + |> Stream.map(fn c -> IO.iodata_to_binary(c) end) + # stream line by line + |> NimbleCSV.RFC4180.to_line_stream() + |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) + # transform the stream to a stream of maps %{column_name1: value1, ...} + |> Stream.transform([], fn r, acc -> + if acc == [] do + {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} + else + {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} + end + end) + end +end diff --git a/apps/transport/lib/jobs/gtfs_to_db.ex b/apps/transport/lib/jobs/gtfs_to_db.ex index 2b64b38c9f..bc05b4dba7 100644 --- a/apps/transport/lib/jobs/gtfs_to_db.ex +++ b/apps/transport/lib/jobs/gtfs_to_db.ex @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do Get the content of a GTFS ResourceHistory, store it in the DB """ - @doc """ - Convert textual values to float. - - iex> convert_text_to_float("0") - 0.0 - iex> convert_text_to_float("0.0") - 0.0 - iex> convert_text_to_float("12.7") - 12.7 - iex> convert_text_to_float("-12.7") - -12.7 - iex> convert_text_to_float(" -48.7 ") - -48.7 - """ - def convert_text_to_float(input) do - input |> String.trim() |> Decimal.new() |> Decimal.to_float() - end - - def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do - value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) - - case value do - nil -> default_value - "" -> default_value - v -> v - end - end + alias Transport.GTFS.Utils def import_gtfs_from_resource_history(resource_history_id) do %{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!() @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do def stops_stream_insert(file_stream, data_import_id) do DB.Repo.transaction(fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() # the map is reshaped for Ecto's needs |> Stream.map(fn r -> %{ data_import_id: data_import_id, stop_id: r |> Map.fetch!("stop_id"), stop_name: r |> Map.fetch!("stop_name"), - stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(), - stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(), - location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer() + stop_lat: r |> Utils.fetch_position("stop_lat"), + stop_lon: r |> Utils.fetch_position("stop_lon"), + location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer() } end) |> Stream.chunk_every(1000) @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do end) end - @doc """ - Transform the stream outputed by Unzip to a stream of maps, each map - corresponding to a row from the CSV. - """ - def to_stream_of_maps(file_stream) do - file_stream - # transform the stream to a stream of binaries - |> Stream.map(fn c -> IO.iodata_to_binary(c) end) - # stream line by line - |> NimbleCSV.RFC4180.to_line_stream() - |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) - # transform the stream to a stream of maps %{column_name1: value1, ...} - |> Stream.transform([], fn r, acc -> - if acc == [] do - {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} - else - {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} - end - end) - end - def fill_calendar_from_resource_history(resource_history_id, data_import_id) do file_stream = file_stream(resource_history_id, "calendar.txt") calendar_stream_insert(file_stream, data_import_id) @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do def calendar_stream_insert(file_stream, data_import_id) do DB.Repo.transaction(fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> res = %{ data_import_id: data_import_id, @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do DB.Repo.transaction( fn -> file_stream - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.map(fn r -> %{ data_import_id: data_import_id, diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 9e3a3c000e..25ecb7bbea 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -6,6 +6,8 @@ defmodule Transport.Registry.GTFS do alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + alias Transport.GTFS.Utils + require Logger @behaviour Transport.Registry.Extractor @@ -23,7 +25,7 @@ defmodule Transport.Registry.GTFS do stops = content - |> to_stream_of_maps() + |> Utils.to_stream_of_maps() |> Stream.flat_map(&handle_stop/1) |> Enum.to_list() @@ -31,30 +33,9 @@ defmodule Transport.Registry.GTFS do end end - @doc """ - Transform the stream outputed by Unzip to a stream of maps, each map - corresponding to a row from the CSV. - """ - def to_stream_of_maps(file_stream) do - file_stream - # transform the stream to a stream of binaries - |> Stream.map(fn c -> IO.iodata_to_binary(c) end) - # stream line by line - |> NimbleCSV.RFC4180.to_line_stream() - |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) - # transform the stream to a stream of maps %{column_name1: value1, ...} - |> Stream.transform([], fn r, acc -> - if acc == [] do - {%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)} - else - {[acc |> Enum.zip(r) |> Enum.into(%{})], acc} - end - end) - end - defp handle_stop(record) do - latitude = fetch_position(record, "stop_lat") - longitude = fetch_position(record, "stop_lon") + latitude = Utils.fetch_position(record, "stop_lat") + longitude = Utils.fetch_position(record, "stop_lon") if latitude != nil && longitude != nil do [ @@ -64,7 +45,7 @@ defmodule Transport.Registry.GTFS do latitude: latitude, longitude: longitude, projection: :utm_wgs84, - stop_type: record |> csv_get_with_default!("location_type", "0") |> to_stop_type() + stop_type: record |> Utils.csv_get_with_default("location_type", "0") |> to_stop_type() } ] else @@ -72,34 +53,6 @@ defmodule Transport.Registry.GTFS do end end - defp fetch_position(record, field) do - Map.fetch!(record, field) |> convert_text_to_float() - end - - @doc """ - Convert textual values to float. - - iex> convert_text_to_float("") - nil - iex> convert_text_to_float("0") - 0.0 - iex> convert_text_to_float("0.0") - 0.0 - iex> convert_text_to_float("12.7") - 12.7 - iex> convert_text_to_float("-12.7") - -12.7 - iex> convert_text_to_float(" -48.7 ") - -48.7 - """ - def convert_text_to_float(input) do - if input |> String.trim() != "" do - input |> String.trim() |> Decimal.new() |> Decimal.to_float() - else - nil - end - end - defp to_stop_type("0"), do: :quay defp to_stop_type("1"), do: :stop defp to_stop_type(_), do: :other @@ -128,14 +81,4 @@ defmodule Transport.Registry.GTFS do defp entry_of_name?(name, %Unzip.Entry{file_name: file_name}) do file_name == name end - - defp csv_get_with_default!(map, field, default_value) do - value = Map.get(map, field) - - case value do - nil -> default_value - "" -> default_value - v -> v - end - end end diff --git a/apps/transport/test/gtfs/utils_test.exs b/apps/transport/test/gtfs/utils_test.exs new file mode 100644 index 0000000000..1e232cf18c --- /dev/null +++ b/apps/transport/test/gtfs/utils_test.exs @@ -0,0 +1,4 @@ +defmodule Transport.GTFS.UtilsTest do + use ExUnit.Case, async: true + doctest Transport.GTFS.Utils, import: true +end From 79b6228bb28820bde8d156e6e45eb6ee18654c24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Thu, 19 Dec 2024 22:00:49 +0100 Subject: [PATCH 07/18] Meilleur nommage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lâchement inspiré de https://hackage.haskell.org/package/base-4.21.0.0/docs/Data-Maybe.html#v:catMaybes et https://hackage.haskell.org/package/base-4.21.0.0/docs/Data-Maybe.html#v:mapMaybe --- apps/transport/lib/registry/engine.ex | 8 +++---- apps/transport/lib/registry/extractor.ex | 14 ++++++------ .../test/registry/extractor_test.exs | 22 +++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 17198d3e9c..0ebbeec0fb 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do create_empty_csv_with_headers(output_file) enumerate_gtfs_resources(limit, formats) - |> Extractor.traverse(&prepare_extractor/1) + |> Extractor.map_result(&prepare_extractor/1) |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) # one for Task.async_stream - |> Extractor.keep_results() + |> Extractor.cat_results() # one for download/1 - |> Extractor.keep_results() - |> Extractor.traverse(&extract_from_archive/1) + |> Extractor.cat_results() + |> Extractor.map_result(&extract_from_archive/1) |> dump_to_csv(output_file) end diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index 34ebb9e14d..19ab45308b 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -11,16 +11,16 @@ defmodule Transport.Registry.Extractor do @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) - @spec keep_results(Stream.t(result(term()))) :: Stream.t(term()) - def keep_results(enumerable), do: Stream.flat_map(enumerable, &keep_result/1) + @spec cat_results(Stream.t(result(term()))) :: Stream.t(term()) + def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) - defp keep_result({:ok, result}), do: [result] - defp keep_result(_), do: [] + defp keep_ok({:ok, result}), do: [result] + defp keep_ok(_), do: [] - @spec traverse(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) - def traverse(enumerable, mapper) do + @spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) + def map_result(enumerable, mapper) do enumerable |> Stream.map(mapper) - |> keep_results() + |> cat_results() end end diff --git a/apps/transport/test/registry/extractor_test.exs b/apps/transport/test/registry/extractor_test.exs index 462196cea3..da60497735 100644 --- a/apps/transport/test/registry/extractor_test.exs +++ b/apps/transport/test/registry/extractor_test.exs @@ -4,23 +4,23 @@ defmodule Transport.Registry.ExtractorTest do require Integer alias Transport.Registry.Extractor - test "keep_results" do - assert [] == keep_results([]) - assert [] == keep_results([{:error, "Error message"}]) - assert [1, 3] == keep_results([{:ok, 1}, {:error, "Error message"}, {:ok, 3}]) + test "cat_results" do + assert [] == cat_results([]) + assert [] == cat_results([{:error, "Error message"}]) + assert [1, 3] == cat_results([{:ok, 1}, {:error, "Error message"}, {:ok, 3}]) end - test "traverse" do - assert [] == traverse([], &even_is_forbidden/1) - assert [1, 3, 5, 7, 9] == traverse(1..10, &even_is_forbidden/1) + test "map_result" do + assert [] == map_result([], &even_is_forbidden/1) + assert [1, 3, 5, 7, 9] == map_result(1..10, &even_is_forbidden/1) end - defp keep_results(enumerable) do - enumerable |> Extractor.keep_results() |> Enum.to_list() + defp cat_results(enumerable) do + enumerable |> Extractor.cat_results() |> Enum.to_list() end - defp traverse(enumerable, mapper) do - enumerable |> Extractor.traverse(mapper) |> Enum.to_list() + defp map_result(enumerable, mapper) do + enumerable |> Extractor.map_result(mapper) |> Enum.to_list() end defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i} From cb1a7d15762e0bac1b433e64d76b56f47908106b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Thu, 19 Dec 2024 22:13:34 +0100 Subject: [PATCH 08/18] Code dans le bon module --- apps/transport/lib/registry/engine.ex | 22 ++++++++--------- apps/transport/lib/registry/extractor.ex | 18 ++------------ apps/transport/lib/registry/gtfs.ex | 24 +++++++++---------- apps/transport/lib/registry/result.ex | 24 +++++++++++++++++++ .../{extractor_test.exs => result_test.exs} | 8 +++---- 5 files changed, 53 insertions(+), 43 deletions(-) create mode 100644 apps/transport/lib/registry/result.ex rename apps/transport/test/registry/{extractor_test.exs => result_test.exs} (75%) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 0ebbeec0fb..ca70c753e1 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -3,9 +3,9 @@ defmodule Transport.Registry.Engine do Stream eligible resources and run extractors to produce a raw registry at the end. """ - alias Transport.Registry.Extractor alias Transport.Registry.GTFS alias Transport.Registry.Model.Stop + alias Transport.Registry.Result import Ecto.Query @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do create_empty_csv_with_headers(output_file) enumerate_gtfs_resources(limit, formats) - |> Extractor.map_result(&prepare_extractor/1) + |> Result.map_result(&prepare_extractor/1) |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) # one for Task.async_stream - |> Extractor.cat_results() + |> Result.cat_results() # one for download/1 - |> Extractor.cat_results() - |> Extractor.map_result(&extract_from_archive/1) + |> Result.cat_results() + |> Result.map_result(&extract_from_archive/1) |> dump_to_csv(output_file) end @@ -54,9 +54,9 @@ defmodule Transport.Registry.Engine do Logger.debug("download #{extractor} #{url}") tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") - error_result = fn msg -> + safe_error = fn msg -> File.rm(tmp_path) - {:error, msg} + Result.error(msg) end http_result = @@ -68,7 +68,7 @@ defmodule Transport.Registry.Engine do case http_result do {:error, error} -> - error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + safe_error.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") {:ok, %{status: status}} -> cond do @@ -76,15 +76,15 @@ defmodule Transport.Registry.Engine do {:ok, {extractor, tmp_path}} status > 400 -> - error_result.("Error #{status} while downloading the resource from #{url}") + safe_error.("Error #{status} while downloading the resource from #{url}") true -> - error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + safe_error.("Unexpected HTTP error #{status} while downloading the resource from #{url}") end end end - @spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()]) + @spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()]) def extract_from_archive({extractor, file}) do Logger.debug("extract_from_archive #{extractor} #{file}") extractor.extract_from_archive(file) diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index 19ab45308b..0f5db7d9ec 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -6,21 +6,7 @@ defmodule Transport.Registry.Extractor do require Logger alias Transport.Registry.Model.Stop + alias Transport.Registry.Result - @type result(positive) :: {:ok, positive} | {:error, binary()} - - @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) - - @spec cat_results(Stream.t(result(term()))) :: Stream.t(term()) - def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) - - defp keep_ok({:ok, result}), do: [result] - defp keep_ok(_), do: [] - - @spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) - def map_result(enumerable, mapper) do - enumerable - |> Stream.map(mapper) - |> cat_results() - end + @callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()]) end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 25ecb7bbea..80c875e0ee 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -5,6 +5,7 @@ defmodule Transport.Registry.GTFS do alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result alias Transport.GTFS.Utils @@ -18,18 +19,16 @@ defmodule Transport.Registry.GTFS do case file_stream(archive) do {:error, error} -> Logger.error(error) - {:error, error} + Result.error(error) {:ok, content} -> Logger.debug("Valid Zip archive") - stops = - content - |> Utils.to_stream_of_maps() - |> Stream.flat_map(&handle_stop/1) - |> Enum.to_list() - - {:ok, stops} + content + |> Utils.to_stream_of_maps() + |> Stream.flat_map(&handle_stop/1) + |> Enum.to_list() + |> Result.ok() end end @@ -63,18 +62,19 @@ defmodule Transport.Registry.GTFS do case Unzip.new(zip_file) do {:ok, unzip} -> if has_stops?(unzip) do - {:ok, Unzip.file_stream!(unzip, "stops.txt")} + unzip |> Unzip.file_stream!("stops.txt") |> Result.ok() else - {:error, "Missing stops.txt in #{archive}"} + Result.error("Missing stops.txt in #{archive}") end {:error, error} -> - {:error, "Error while unzipping archive #{archive}: #{error}"} + Result.error("Error while unzipping archive #{archive}: #{error}") end end defp has_stops?(unzip) do - Unzip.list_entries(unzip) + unzip + |> Unzip.list_entries() |> Enum.any?(&entry_of_name?("stops.txt", &1)) end diff --git a/apps/transport/lib/registry/result.ex b/apps/transport/lib/registry/result.ex new file mode 100644 index 0000000000..5821cbfa41 --- /dev/null +++ b/apps/transport/lib/registry/result.ex @@ -0,0 +1,24 @@ +defmodule Transport.Registry.Result do + @moduledoc """ + Type and utilities to represent results. + """ + + @type t(positive) :: {:ok, positive} | {:error, binary()} + + def ok(positive), do: {:ok, positive} + + def error(message), do: {:error, message} + + @spec cat_results(Stream.t(t(term()))) :: Stream.t(term()) + def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) + + defp keep_ok({:ok, result}), do: [result] + defp keep_ok(_), do: [] + + @spec map_result(Stream.t(term()), (term() -> t(term()))) :: Stream.t(term()) + def map_result(enumerable, mapper) do + enumerable + |> Stream.map(mapper) + |> cat_results() + end +end diff --git a/apps/transport/test/registry/extractor_test.exs b/apps/transport/test/registry/result_test.exs similarity index 75% rename from apps/transport/test/registry/extractor_test.exs rename to apps/transport/test/registry/result_test.exs index da60497735..d48a8d587b 100644 --- a/apps/transport/test/registry/extractor_test.exs +++ b/apps/transport/test/registry/result_test.exs @@ -1,8 +1,8 @@ -defmodule Transport.Registry.ExtractorTest do +defmodule Transport.Registry.ResultTest do use ExUnit.Case, async: false require Integer - alias Transport.Registry.Extractor + alias Transport.Registry.Result test "cat_results" do assert [] == cat_results([]) @@ -16,11 +16,11 @@ defmodule Transport.Registry.ExtractorTest do end defp cat_results(enumerable) do - enumerable |> Extractor.cat_results() |> Enum.to_list() + enumerable |> Result.cat_results() |> Enum.to_list() end defp map_result(enumerable, mapper) do - enumerable |> Extractor.map_result(mapper) |> Enum.to_list() + enumerable |> Result.map_result(mapper) |> Enum.to_list() end defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i} From c3b26d930502cd8e1e9db4b742312b8eca360da5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Fri, 20 Dec 2024 11:15:38 +0100 Subject: [PATCH 09/18] NeTEx extractors: explicit raising --- .../lib/netex/netex_archive_parser.ex | 47 +++++++++++++++---- .../test/netex/netex_archive_parser_test.exs | 9 ++++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/apps/transport/lib/netex/netex_archive_parser.ex b/apps/transport/lib/netex/netex_archive_parser.ex index 40b2d95fe8..d95e038425 100644 --- a/apps/transport/lib/netex/netex_archive_parser.ex +++ b/apps/transport/lib/netex/netex_archive_parser.ex @@ -20,16 +20,16 @@ defmodule Transport.NeTEx do # Entry names ending with a slash `/` are directories. Skip them. # https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69 String.ends_with?(file_name, "/") -> - [] + {:ok, []} extension |> String.downcase() == ".zip" -> - raise "Insupported zip inside zip for file #{file_name}" + {:error, "Insupported zip inside zip for file #{file_name}"} extension |> String.downcase() != ".xml" -> - raise "Insupported file extension (#{extension}) for file #{file_name}" + {:error, "Insupported file extension (#{extension}) for file #{file_name}"} true -> - {:ok, state} = + parsing_result = unzip |> Unzip.file_stream!(file_name) |> Stream.map(&IO.iodata_to_binary(&1)) @@ -42,7 +42,21 @@ defmodule Transport.NeTEx do end }) - state.stop_places + case parsing_result do + {:ok, state} -> {:ok, state.stop_places} + {:error, exception} -> {:error, Exception.message(exception)} + {:halt, _state, _rest} -> {:error, "SAX parsing interrupted unexpectedly."} + end + end + end + + @doc """ + Like read_stop_places/2 but raises on errors. + """ + def read_stop_places!(%Unzip{} = unzip, file_name) do + case read_stop_places(unzip, file_name) do + {:ok, stop_places} -> stop_places + {:error, message} -> raise message end end @@ -53,8 +67,14 @@ defmodule Transport.NeTEx do zip_file = Unzip.LocalFile.open(zip_file_name) try do - {:ok, unzip} = Unzip.new(zip_file) - cb.(unzip) + case Unzip.new(zip_file) do + {:ok, unzip} -> + cb.(unzip) + + {:error, message} -> + Logger.error("Error while reading #{zip_file_name}: #{message}") + [] + end after Unzip.LocalFile.close(zip_file) end @@ -67,6 +87,17 @@ defmodule Transport.NeTEx do See tests for actual output. Will be refactored soonish. """ def read_all_stop_places(zip_file_name) do + read_all(zip_file_name, &read_stop_places/2) + end + + @doc """ + Like read_all_stop_places/1 but raises on error. + """ + def read_all_stop_places!(zip_file_name) do + read_all(zip_file_name, &read_stop_places!/2) + end + + defp read_all(zip_file_name, reader) do with_zip_file_handle(zip_file_name, fn unzip -> unzip |> Unzip.list_entries() @@ -75,7 +106,7 @@ defmodule Transport.NeTEx do { metadata.file_name, - read_stop_places(unzip, metadata.file_name) + reader.(unzip, metadata.file_name) } end) end) diff --git a/apps/transport/test/netex/netex_archive_parser_test.exs b/apps/transport/test/netex/netex_archive_parser_test.exs index a619424ad8..1c7c93d641 100644 --- a/apps/transport/test/netex/netex_archive_parser_test.exs +++ b/apps/transport/test/netex/netex_archive_parser_test.exs @@ -43,6 +43,15 @@ defmodule Transport.NeTEx.ArchiveParserTest do # given a zip netex archive containing 1 file, I want the output I expected [{"arrets.xml", data}] = Transport.NeTEx.read_all_stop_places(tmp_file) + assert data == + {:ok, + [ + %{id: "FR:HELLO:POYARTIN:001", latitude: 43.669, longitude: -0.919, name: "Poyartin"} + ]} + + # given a zip netex archive containing 1 file, I want the output I expected + [{"arrets.xml", data}] = Transport.NeTEx.read_all_stop_places!(tmp_file) + assert data == [ %{id: "FR:HELLO:POYARTIN:001", latitude: 43.669, longitude: -0.919, name: "Poyartin"} ] From d74db74d19ff3b2a377913544713dfbd2f8ee4b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Fri, 20 Dec 2024 13:29:03 +0100 Subject: [PATCH 10/18] Increased timeout --- apps/transport/lib/registry/engine.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index ca70c753e1..190d724d5e 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -20,7 +20,7 @@ defmodule Transport.Registry.Engine do enumerate_gtfs_resources(limit, formats) |> Result.map_result(&prepare_extractor/1) - |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) + |> Task.async_stream(&download/1, max_concurrency: 12, timeout: 30 * 60_000) # one for Task.async_stream |> Result.cat_results() # one for download/1 From 2d53a9a306126b15cd1902602cfc112964ee0901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Fri, 20 Dec 2024 11:58:52 +0100 Subject: [PATCH 11/18] =?UTF-8?q?Registre=20d'arr=C3=AAts=20:=20extraire?= =?UTF-8?q?=20les=20arr=C3=AAts=20des=20NeTEx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/transport/lib/registry/engine.ex | 2 + apps/transport/lib/registry/model/stop.ex | 2 + apps/transport/lib/registry/netex.ex | 58 +++++++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 apps/transport/lib/registry/netex.ex diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 190d724d5e..a677c36c48 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -4,6 +4,7 @@ defmodule Transport.Registry.Engine do """ alias Transport.Registry.GTFS + alias Transport.Registry.NeTEx alias Transport.Registry.Model.Stop alias Transport.Registry.Result @@ -46,6 +47,7 @@ defmodule Transport.Registry.Engine do def prepare_extractor(%DB.Resource{} = resource) do case resource.format do "GTFS" -> {:ok, {GTFS, resource.url}} + "NeTEx" -> {:ok, {NeTEx, resource.url}} _ -> {:error, "Unsupported format"} end end diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex index 89a1076b17..3e9c9b84d4 100644 --- a/apps/transport/lib/registry/model/stop.ex +++ b/apps/transport/lib/registry/model/stop.ex @@ -15,6 +15,8 @@ defmodule Transport.Registry.Model.StopIdentifier do @type identifier_type :: :main | :private_code | :stop_code | :other + def main(id), do: %__MODULE__{type: :main, id: id} + @doc """ iex> to_field(%Transport.Registry.Model.StopIdentifier{id: "stop1", type: :main}) "main:stop1" diff --git a/apps/transport/lib/registry/netex.ex b/apps/transport/lib/registry/netex.ex new file mode 100644 index 0000000000..248c4619f7 --- /dev/null +++ b/apps/transport/lib/registry/netex.ex @@ -0,0 +1,58 @@ +defmodule Transport.Registry.NeTEx do + @moduledoc """ + Implementation of a stop extractor for NeTEx resources. + """ + + alias Transport.Registry.Model.Stop + alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result + + require Logger + + @behaviour Transport.Registry.Extractor + @doc """ + Extract stops from a NeTEx archive. + """ + def extract_from_archive(archive) do + # FIXME: propagate some context + data_source_id = nil + + archive + |> Transport.NeTEx.read_all_stop_places() + |> Enum.flat_map(&process_stop_places(data_source_id, &1)) + |> Result.ok() + end + + defp process_stop_places(data_source_id, {_filename, {:ok, stop_places}}) do + stop_places |> Enum.map(&to_stop(data_source_id, &1)) |> Result.cat_results() + end + + defp process_stop_places(_data_source_id, {filename, {:error, message}}) do + Logger.error("Processing of #{filename}, error: #{message}") + [] + end + + defp to_stop(data_source_id, %{id: id, name: name, latitude: latitude, longitude: longitude}) do + %Stop{ + main_id: StopIdentifier.main(id), + display_name: name, + latitude: latitude, + longitude: longitude, + data_source_format: :netex, + data_source_id: data_source_id + } + |> Result.ok() + end + + defp to_stop(_data_source_id, incomplete_record) do + expected_keys = MapSet.new(~w(id name latitude longitude)) + keys = MapSet.new(Map.keys(incomplete_record)) + + missing_keys = MapSet.difference(expected_keys, keys) |> Enum.to_list() + + message = "Can't build stop, missing keys: #{inspect(missing_keys)}" + + Logger.error(message) + Result.error(message) + end +end From 87bcf13d45a0bcaaf991da1fe52b4a671eeb7cf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Fri, 20 Dec 2024 15:06:27 +0100 Subject: [PATCH 12/18] =?UTF-8?q?Identifiant=20de=20la=20source=20(tra?= =?UTF-8?q?=C3=A7abilit=C3=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/transport/lib/registry/engine.ex | 21 ++++++++++++--------- apps/transport/lib/registry/extractor.ex | 4 +++- apps/transport/lib/registry/gtfs.ex | 10 ++++++---- apps/transport/lib/registry/netex.ex | 5 +---- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index a677c36c48..b7a8b446b0 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -5,6 +5,7 @@ defmodule Transport.Registry.Engine do alias Transport.Registry.GTFS alias Transport.Registry.NeTEx + alias Transport.Registry.Model.DataSource alias Transport.Registry.Model.Stop alias Transport.Registry.Result @@ -45,15 +46,17 @@ defmodule Transport.Registry.Engine do end def prepare_extractor(%DB.Resource{} = resource) do + data_source_id = "PAN:resource:#{resource.id}" + case resource.format do - "GTFS" -> {:ok, {GTFS, resource.url}} - "NeTEx" -> {:ok, {NeTEx, resource.url}} + "GTFS" -> {:ok, {GTFS, data_source_id, resource.url}} + "NeTEx" -> {:ok, {NeTEx, data_source_id, resource.url}} _ -> {:error, "Unsupported format"} end end - def download({extractor, url}) do - Logger.debug("download #{extractor} #{url}") + def download({extractor, data_source_id, url}) do + Logger.debug("download #{extractor} #{data_source_id} #{url}") tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") safe_error = fn msg -> @@ -75,7 +78,7 @@ defmodule Transport.Registry.Engine do {:ok, %{status: status}} -> cond do status >= 200 && status < 300 -> - {:ok, {extractor, tmp_path}} + {:ok, {extractor, data_source_id, tmp_path}} status > 400 -> safe_error.("Error #{status} while downloading the resource from #{url}") @@ -86,10 +89,10 @@ defmodule Transport.Registry.Engine do end end - @spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()]) - def extract_from_archive({extractor, file}) do - Logger.debug("extract_from_archive #{extractor} #{file}") - extractor.extract_from_archive(file) + @spec extract_from_archive({module(), DataSource.data_source_id(), Path.t()}) :: Result.t([Stop.t()]) + def extract_from_archive({extractor, data_source_id, file}) do + Logger.debug("extract_from_archive #{extractor} #{data_source_id} #{file}") + extractor.extract_from_archive(data_source_id, file) end def dump_to_csv(enumerable, output_file) do diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index 0f5db7d9ec..fa5f0da5a5 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -5,8 +5,10 @@ defmodule Transport.Registry.Extractor do require Logger + alias Transport.Registry.Model.DataSource alias Transport.Registry.Model.Stop alias Transport.Registry.Result - @callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()]) + @callback extract_from_archive(data_source_id :: DataSource.data_source_id(), path :: Path.t()) :: + Result.t([Stop.t()]) end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 80c875e0ee..7f558651ff 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -15,7 +15,7 @@ defmodule Transport.Registry.GTFS do @doc """ Extract stops from GTFS ressource. """ - def extract_from_archive(archive) do + def extract_from_archive(data_source_id, archive) do case file_stream(archive) do {:error, error} -> Logger.error(error) @@ -26,13 +26,13 @@ defmodule Transport.Registry.GTFS do content |> Utils.to_stream_of_maps() - |> Stream.flat_map(&handle_stop/1) + |> Stream.flat_map(&handle_stop(data_source_id, &1)) |> Enum.to_list() |> Result.ok() end end - defp handle_stop(record) do + defp handle_stop(data_source_id, record) do latitude = Utils.fetch_position(record, "stop_lat") longitude = Utils.fetch_position(record, "stop_lon") @@ -44,7 +44,9 @@ defmodule Transport.Registry.GTFS do latitude: latitude, longitude: longitude, projection: :utm_wgs84, - stop_type: record |> Utils.csv_get_with_default("location_type", "0") |> to_stop_type() + stop_type: record |> Utils.csv_get_with_default("location_type", "0") |> to_stop_type(), + data_source_format: :gtfs, + data_source_id: data_source_id } ] else diff --git a/apps/transport/lib/registry/netex.ex b/apps/transport/lib/registry/netex.ex index 248c4619f7..aa8dcf321d 100644 --- a/apps/transport/lib/registry/netex.ex +++ b/apps/transport/lib/registry/netex.ex @@ -13,10 +13,7 @@ defmodule Transport.Registry.NeTEx do @doc """ Extract stops from a NeTEx archive. """ - def extract_from_archive(archive) do - # FIXME: propagate some context - data_source_id = nil - + def extract_from_archive(data_source_id, archive) do archive |> Transport.NeTEx.read_all_stop_places() |> Enum.flat_map(&process_stop_places(data_source_id, &1)) From 604202c1ce509f3bf9c674819731c59afd01b893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Fri, 20 Dec 2024 15:36:36 +0100 Subject: [PATCH 13/18] Linting --- apps/transport/lib/registry/engine.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index b7a8b446b0..c4cf4a3e77 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -4,9 +4,9 @@ defmodule Transport.Registry.Engine do """ alias Transport.Registry.GTFS - alias Transport.Registry.NeTEx alias Transport.Registry.Model.DataSource alias Transport.Registry.Model.Stop + alias Transport.Registry.NeTEx alias Transport.Registry.Result import Ecto.Query From aea3e94679c9e8838c57f96ed955ccdbff09a529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Mon, 13 Jan 2025 17:38:37 +0100 Subject: [PATCH 14/18] Some typespecs and docs --- apps/transport/lib/registry/engine.ex | 7 ++++++- apps/transport/lib/registry/result.ex | 9 +++++++++ apps/transport/test/registry/result_test.exs | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index c4cf4a3e77..59e92b870f 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -13,7 +13,12 @@ defmodule Transport.Registry.Engine do require Logger - @spec execute(output_file :: Path.t(), list()) :: :ok + @type option :: {:limit, integer()} | {:formats, [String.t()]} + + @doc """ + execute("/tmp/registre-arrets.csv", formats: ~w(GTFS NeTEx), limit: 100) + """ + @spec execute(output_file :: Path.t(), opts :: [option]) :: :ok def execute(output_file, opts \\ []) do limit = Keyword.get(opts, :limit, 1_000_000) formats = Keyword.get(opts, :formats, ~w(GTFS NeTEx)) diff --git a/apps/transport/lib/registry/result.ex b/apps/transport/lib/registry/result.ex index 5821cbfa41..e87f61e274 100644 --- a/apps/transport/lib/registry/result.ex +++ b/apps/transport/lib/registry/result.ex @@ -2,6 +2,7 @@ defmodule Transport.Registry.Result do @moduledoc """ Type and utilities to represent results. """ + require Integer @type t(positive) :: {:ok, positive} | {:error, binary()} @@ -9,12 +10,20 @@ defmodule Transport.Registry.Result do def error(message), do: {:error, message} + @doc """ + iex> [{:ok, "valid"}, {:error, "invalid"}, {:ok, "relevant"}] |> cat_results() + ["valid", "relevant"] + """ @spec cat_results(Stream.t(t(term()))) :: Stream.t(term()) def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) defp keep_ok({:ok, result}), do: [result] defp keep_ok(_), do: [] + @doc """ + iex> 1..10 |> map_result(fn v -> if Integer.is_odd(v) do {:ok, v} else {:error, "Even Steven"} end end) + [1, 3, 5, 7, 9] + """ @spec map_result(Stream.t(term()), (term() -> t(term()))) :: Stream.t(term()) def map_result(enumerable, mapper) do enumerable diff --git a/apps/transport/test/registry/result_test.exs b/apps/transport/test/registry/result_test.exs index d48a8d587b..7c500d3186 100644 --- a/apps/transport/test/registry/result_test.exs +++ b/apps/transport/test/registry/result_test.exs @@ -3,6 +3,7 @@ defmodule Transport.Registry.ResultTest do require Integer alias Transport.Registry.Result + doctest Result test "cat_results" do assert [] == cat_results([]) From e070530907b29e917d319c51c20df57674495fbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Tue, 14 Jan 2025 18:30:48 +0100 Subject: [PATCH 15/18] Resist to unescaped characters in gtfs --- apps/transport/lib/registry/gtfs.ex | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 7f558651ff..b3f16f0859 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -24,11 +24,18 @@ defmodule Transport.Registry.GTFS do {:ok, content} -> Logger.debug("Valid Zip archive") - content - |> Utils.to_stream_of_maps() - |> Stream.flat_map(&handle_stop(data_source_id, &1)) - |> Enum.to_list() - |> Result.ok() + try do + content + |> Utils.to_stream_of_maps() + |> Stream.flat_map(&handle_stop(data_source_id, &1)) + |> Enum.to_list() + |> Result.ok() + rescue + e in NimbleCSV.ParseError -> + e + |> Exception.message() + |> Result.error() + end end end From 287f6790be0721551557cb1f485ef5467060a72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Wed, 15 Jan 2025 17:15:19 +0100 Subject: [PATCH 16/18] =?UTF-8?q?Resource=20id=20de=20data.gouv=20plut?= =?UTF-8?q?=C3=B4t=20que=20PAN?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/transport/lib/registry/engine.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 59e92b870f..bd0f72af12 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -51,7 +51,7 @@ defmodule Transport.Registry.Engine do end def prepare_extractor(%DB.Resource{} = resource) do - data_source_id = "PAN:resource:#{resource.id}" + data_source_id = "datagouv:resource:#{resource.datagouv_id}" case resource.format do "GTFS" -> {:ok, {GTFS, data_source_id, resource.url}} From 52dbc1cd4ae3dae7c8ebe1ff95f4d5f712eb5b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Thu, 16 Jan 2025 13:55:52 +0100 Subject: [PATCH 17/18] Close early open files --- apps/transport/lib/registry/gtfs.ex | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index b3f16f0859..302aa61ef4 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -21,7 +21,7 @@ defmodule Transport.Registry.GTFS do Logger.error(error) Result.error(error) - {:ok, content} -> + {:ok, {content, zip_file}} -> Logger.debug("Valid Zip archive") try do @@ -30,6 +30,8 @@ defmodule Transport.Registry.GTFS do |> Stream.flat_map(&handle_stop(data_source_id, &1)) |> Enum.to_list() |> Result.ok() + after + Unzip.LocalFile.close(zip_file) rescue e in NimbleCSV.ParseError -> e @@ -71,7 +73,8 @@ defmodule Transport.Registry.GTFS do case Unzip.new(zip_file) do {:ok, unzip} -> if has_stops?(unzip) do - unzip |> Unzip.file_stream!("stops.txt") |> Result.ok() + content = unzip |> Unzip.file_stream!("stops.txt") + Result.ok({content, zip_file}) else Result.error("Missing stops.txt in #{archive}") end From 89dad4a3ceff76be2137083a530bcd57f9b1d00d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Menou?= Date: Mon, 20 Jan 2025 14:04:50 +0100 Subject: [PATCH 18/18] Close files on error --- apps/transport/lib/registry/gtfs.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 302aa61ef4..57351e8ce6 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -74,8 +74,10 @@ defmodule Transport.Registry.GTFS do {:ok, unzip} -> if has_stops?(unzip) do content = unzip |> Unzip.file_stream!("stops.txt") + # The zip_file is kept open for now as it's consumed later. Result.ok({content, zip_file}) else + Unzip.LocalFile.close(zip_file) Result.error("Missing stops.txt in #{archive}") end