From c243c788fd2af7192123ca4488f2c8e6214aef9e Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:32:26 +0100 Subject: [PATCH 1/6] parametrize tests --- tpch/tests/queries_test.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/tpch/tests/queries_test.py b/tpch/tests/queries_test.py index c228fd52b..b0c24bd72 100644 --- a/tpch/tests/queries_test.py +++ b/tpch/tests/queries_test.py @@ -4,20 +4,22 @@ import sys from pathlib import Path +import pytest -def test_execute_scripts() -> None: - root = Path(__file__).resolve().parent.parent - # directory containing all the queries - execute_dir = root / "execute" +ROOT_PATH = Path(__file__).resolve().parent.parent +# directory containing all the script to execute queries +EXECUTE_DIR = ROOT_PATH / "execute" - for script_path in execute_dir.glob("q[1-9]*.py"): - print(f"executing query {script_path.stem}") # noqa: T201 - result = subprocess.run( # noqa: S603 - [sys.executable, "-m", f"execute.{script_path.stem}"], - capture_output=True, - text=True, - check=False, - ) - assert ( - result.returncode == 0 - ), f"Script {script_path} failed with error: {result.stderr}" + +@pytest.mark.parametrize("script_path", EXECUTE_DIR.glob("q[1-9]*.py")) +def test_execute_scripts(script_path: Path) -> None: + print(f"executing query {script_path.stem}") # noqa: T201 + result = subprocess.run( # noqa: S603 + [sys.executable, "-m", f"execute.{script_path.stem}"], + capture_output=True, + text=True, + check=False, + ) + assert ( + result.returncode == 0 + ), f"Script {script_path} failed with error: {result.stderr}" From 9d465042aea850b9a09c044de5b4838ac74d6a4c Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Mon, 11 Nov 2024 20:19:58 +0100 Subject: [PATCH 2/6] one script for execute --- tpch/README.md | 2 +- tpch/execute/__init__.py | 32 ----------- tpch/execute/main.py | 112 +++++++++++++++++++++++++++++++++++++ tpch/execute/q1.py | 11 ---- tpch/execute/q10.py | 25 --------- tpch/execute/q11.py | 24 -------- tpch/execute/q12.py | 23 -------- tpch/execute/q13.py | 23 -------- tpch/execute/q14.py | 23 -------- tpch/execute/q15.py | 23 -------- tpch/execute/q16.py | 24 -------- tpch/execute/q17.py | 23 -------- tpch/execute/q18.py | 24 -------- tpch/execute/q19.py | 19 ------- tpch/execute/q2.py | 55 ------------------ tpch/execute/q20.py | 22 -------- tpch/execute/q21.py | 21 ------- tpch/execute/q22.py | 23 -------- tpch/execute/q3.py | 24 -------- tpch/execute/q4.py | 23 -------- tpch/execute/q5.py | 43 -------------- tpch/execute/q6.py | 22 -------- tpch/execute/q7.py | 30 ---------- tpch/execute/q8.py | 69 ----------------------- tpch/execute/q9.py | 39 ------------- tpch/tests/queries_test.py | 14 ++--- 26 files changed, 120 insertions(+), 653 deletions(-) create mode 100644 tpch/execute/main.py delete mode 100644 tpch/execute/q1.py delete mode 100644 tpch/execute/q10.py delete mode 100644 tpch/execute/q11.py delete mode 100644 tpch/execute/q12.py delete mode 100644 tpch/execute/q13.py delete mode 100644 tpch/execute/q14.py delete mode 100644 tpch/execute/q15.py delete mode 100644 tpch/execute/q16.py delete mode 100644 tpch/execute/q17.py delete mode 100644 tpch/execute/q18.py delete mode 100644 tpch/execute/q19.py delete mode 100644 tpch/execute/q2.py delete mode 100644 tpch/execute/q20.py delete mode 100644 tpch/execute/q21.py delete mode 100644 tpch/execute/q22.py delete mode 100644 tpch/execute/q3.py delete mode 100644 tpch/execute/q4.py delete mode 100644 tpch/execute/q5.py delete mode 100644 tpch/execute/q6.py delete mode 100644 tpch/execute/q7.py delete mode 100644 tpch/execute/q8.py delete mode 100644 tpch/execute/q9.py diff --git a/tpch/README.md b/tpch/README.md index 3ae09b723..65d6cd4c0 100644 --- a/tpch/README.md +++ b/tpch/README.md @@ -11,7 +11,7 @@ Run `python generate_data.py` from this folder. ## Run queries -To run Q1, you can run `python -m execute.q1` from this folder. +To run Q1, you can run `python -m execute.main q1` from this folder. Please add query definitions in `queries`, and scripts to execute them in `execute` (see `queries/q1.py` and `execute/q1.py` for examples). diff --git a/tpch/execute/__init__.py b/tpch/execute/__init__.py index ecbf1db53..e69de29bb 100644 --- a/tpch/execute/__init__.py +++ b/tpch/execute/__init__.py @@ -1,32 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -import dask.dataframe as dd -import pandas as pd -import polars as pl -import pyarrow.parquet as pq - -pd.options.mode.copy_on_write = True -pd.options.future.infer_string = True - -lineitem = Path("data") / "lineitem.parquet" -region = Path("data") / "region.parquet" -nation = Path("data") / "nation.parquet" -supplier = Path("data") / "supplier.parquet" -part = Path("data") / "part.parquet" -partsupp = Path("data") / "partsupp.parquet" -orders = Path("data") / "orders.parquet" -customer = Path("data") / "customer.parquet" -line_item = Path("data") / "lineitem.parquet" - -IO_FUNCS = { - "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), - "pandas[pyarrow]": lambda x: pd.read_parquet( - x, engine="pyarrow", dtype_backend="pyarrow" - ), - "polars[eager]": lambda x: pl.read_parquet(x), - "polars[lazy]": lambda x: pl.scan_parquet(x), - "pyarrow": lambda x: pq.read_table(x), - "dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"), -} diff --git a/tpch/execute/main.py b/tpch/execute/main.py new file mode 100644 index 000000000..64d753393 --- /dev/null +++ b/tpch/execute/main.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import argparse +from importlib import import_module +from pathlib import Path + +import pandas as pd +import polars as pl +import pyarrow.parquet as pq + +pd.options.mode.copy_on_write = True +pd.options.future.infer_string = True + +# Data paths +DATA_DIR = Path("data") +LINEITEM_PATH = DATA_DIR / "lineitem.parquet" +REGION_PATH = DATA_DIR / "region.parquet" +NATION_PATH = DATA_DIR / "nation.parquet" +SUPPLIER_PATH = DATA_DIR / "supplier.parquet" +PART_PATH = DATA_DIR / "part.parquet" +PARTSUPP_PATH = DATA_DIR / "partsupp.parquet" +ORDERS_PATH = DATA_DIR / "orders.parquet" +CUSTOMER_PATH = DATA_DIR / "customer.parquet" + +# Read functions for each backend +READ_FUNCS = { + # "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), # noqa: ERA001 + "pandas[pyarrow]": lambda x: pd.read_parquet( + x, engine="pyarrow", dtype_backend="pyarrow" + ), + # "polars[eager]": lambda x: pl.read_parquet(x), + "polars[lazy]": lambda x: pl.scan_parquet(x), + "pyarrow": lambda x: pq.read_table(x), + # "dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"), # noqa: ERA001 +} + +# Collect functions for the lazy backends +COLLECT_FUNCS = { + "polars[lazy]": lambda x: x.collect(), + # "dask": lambda x: x.compute(), # noqa: ERA001 +} + +QUERY_DATA = { + "q1": (LINEITEM_PATH,), + "q2": (REGION_PATH, NATION_PATH, SUPPLIER_PATH, PART_PATH, PARTSUPP_PATH), + "q3": (CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH), + "q4": (LINEITEM_PATH, ORDERS_PATH), + "q5": ( + REGION_PATH, + NATION_PATH, + CUSTOMER_PATH, + LINEITEM_PATH, + ORDERS_PATH, + SUPPLIER_PATH, + ), + "q6": (LINEITEM_PATH,), + "q7": (NATION_PATH, CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH, SUPPLIER_PATH), + "q8": ( + PART_PATH, + SUPPLIER_PATH, + LINEITEM_PATH, + ORDERS_PATH, + CUSTOMER_PATH, + NATION_PATH, + REGION_PATH, + ), + "q9": ( + PART_PATH, + PARTSUPP_PATH, + NATION_PATH, + LINEITEM_PATH, + ORDERS_PATH, + SUPPLIER_PATH, + ), + "q10": (CUSTOMER_PATH, NATION_PATH, LINEITEM_PATH, ORDERS_PATH), + "q11": (NATION_PATH, PARTSUPP_PATH, SUPPLIER_PATH), + "q12": (LINEITEM_PATH, ORDERS_PATH), + "q13": (CUSTOMER_PATH, ORDERS_PATH), + "q14": (LINEITEM_PATH, PART_PATH), + "q15": (LINEITEM_PATH, SUPPLIER_PATH), + "q16": (PART_PATH, PARTSUPP_PATH, SUPPLIER_PATH), + "q17": (LINEITEM_PATH, PART_PATH), + "q18": (CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH), + "q19": (LINEITEM_PATH, PART_PATH), + "q20": (PART_PATH, PARTSUPP_PATH, NATION_PATH, LINEITEM_PATH, SUPPLIER_PATH), + "q21": (LINEITEM_PATH, NATION_PATH, ORDERS_PATH, SUPPLIER_PATH), + "q22": (CUSTOMER_PATH, ORDERS_PATH), +} + + +def execute_query(query_id: str) -> None: + query_module = import_module(f"tpch.queries.{query_id}") + data_paths = QUERY_DATA[query_id] + + for backend, read_func in READ_FUNCS.items(): + print(f"\nRunning {query_id} with {backend=}") + result = query_module.query(*(read_func(path) for path in data_paths)) + if collect_func := COLLECT_FUNCS.get(backend): + result = collect_func(result) + print(result) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Execute a TPCH query by number.") + parser.add_argument("query", type=str, help="The query to execute, e.g. 'q1'.") + args = parser.parse_args() + + execute_query(query_id=args.query) + + +if __name__ == "__main__": + main() diff --git a/tpch/execute/q1.py b/tpch/execute/q1.py deleted file mode 100644 index d0ebce584..000000000 --- a/tpch/execute/q1.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import annotations - -from queries import q1 - -from . import IO_FUNCS -from . import lineitem - -print(q1.query(IO_FUNCS["pandas[pyarrow]"](lineitem))) -print(q1.query(IO_FUNCS["polars[lazy]"](lineitem)).collect()) -print(q1.query(IO_FUNCS["pyarrow"](lineitem))) -print(q1.query(IO_FUNCS["dask"](lineitem)).compute()) diff --git a/tpch/execute/q10.py b/tpch/execute/q10.py deleted file mode 100644 index 1f610932c..000000000 --- a/tpch/execute/q10.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -from queries import q10 - -from . import IO_FUNCS -from . import customer -from . import lineitem -from . import nation -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q10.query(fn(customer), fn(nation), fn(lineitem), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q10.query(fn(customer), fn(nation), fn(lineitem), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q10.query(fn(customer), fn(nation), fn(lineitem), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q10.query(fn(customer), fn(nation), fn(lineitem), fn(orders)).compute()) diff --git a/tpch/execute/q11.py b/tpch/execute/q11.py deleted file mode 100644 index 0dd8a243c..000000000 --- a/tpch/execute/q11.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from queries import q11 - -from . import IO_FUNCS -from . import nation -from . import partsupp -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q11.query(fn(nation), fn(partsupp), fn(supplier))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q11.query(fn(nation), fn(partsupp), fn(supplier)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q11.query(fn(nation), fn(partsupp), fn(supplier))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q11.query(fn(nation), fn(partsupp), fn(supplier)).compute()) diff --git a/tpch/execute/q12.py b/tpch/execute/q12.py deleted file mode 100644 index f684e22ad..000000000 --- a/tpch/execute/q12.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q12 - -from . import IO_FUNCS -from . import line_item -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q12.query(fn(line_item), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q12.query(fn(line_item), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q12.query(fn(line_item), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q12.query(fn(line_item), fn(orders)).compute()) diff --git a/tpch/execute/q13.py b/tpch/execute/q13.py deleted file mode 100644 index 7b03a2f2f..000000000 --- a/tpch/execute/q13.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q13 - -from . import IO_FUNCS -from . import customer -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q13.query(fn(customer), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q13.query(fn(customer), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q13.query(fn(customer), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q13.query(fn(customer), fn(orders)).compute()) diff --git a/tpch/execute/q14.py b/tpch/execute/q14.py deleted file mode 100644 index a82330136..000000000 --- a/tpch/execute/q14.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q14 - -from . import IO_FUNCS -from . import line_item -from . import part - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q14.query(fn(line_item), fn(part))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q14.query(fn(line_item), fn(part)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q14.query(fn(line_item), fn(part))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q14.query(fn(line_item), fn(part)).compute()) diff --git a/tpch/execute/q15.py b/tpch/execute/q15.py deleted file mode 100644 index 40b4432b1..000000000 --- a/tpch/execute/q15.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q15 - -from . import IO_FUNCS -from . import lineitem -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q15.query(fn(lineitem), fn(supplier))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q15.query(fn(lineitem), fn(supplier)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q15.query(fn(lineitem), fn(supplier))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q15.query(fn(lineitem), fn(supplier)).compute()) diff --git a/tpch/execute/q16.py b/tpch/execute/q16.py deleted file mode 100644 index ef30f935c..000000000 --- a/tpch/execute/q16.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from queries import q16 - -from . import IO_FUNCS -from . import part -from . import partsupp -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q16.query(fn(part), fn(partsupp), fn(supplier))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q16.query(fn(part), fn(partsupp), fn(supplier)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q16.query(fn(part), fn(partsupp), fn(supplier))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q16.query(fn(part), fn(partsupp), fn(supplier)).compute()) diff --git a/tpch/execute/q17.py b/tpch/execute/q17.py deleted file mode 100644 index 0b7ca4a66..000000000 --- a/tpch/execute/q17.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q17 - -from . import IO_FUNCS -from . import lineitem -from . import part - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q17.query(fn(lineitem), fn(part))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q17.query(fn(lineitem), fn(part)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q17.query(fn(lineitem), fn(part))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q17.query(fn(lineitem), fn(part)).compute()) diff --git a/tpch/execute/q18.py b/tpch/execute/q18.py deleted file mode 100644 index a096deb2f..000000000 --- a/tpch/execute/q18.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from queries import q18 - -from . import IO_FUNCS -from . import customer -from . import lineitem -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q18.query(fn(customer), fn(lineitem), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q18.query(fn(customer), fn(lineitem), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q18.query(fn(customer), fn(lineitem), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q18.query(fn(customer), fn(lineitem), fn(orders)).compute()) diff --git a/tpch/execute/q19.py b/tpch/execute/q19.py deleted file mode 100644 index 23095a890..000000000 --- a/tpch/execute/q19.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import annotations - -from queries import q19 - -from . import IO_FUNCS -from . import lineitem -from . import part - -fn = IO_FUNCS["pandas[pyarrow]"] -print(q19.query(fn(lineitem), fn(part))) - -fn = IO_FUNCS["polars[lazy]"] -print(q19.query(fn(lineitem), fn(part)).collect()) - -fn = IO_FUNCS["pyarrow"] -print(q19.query(fn(lineitem), fn(part))) - -fn = IO_FUNCS["dask"] -print(q19.query(fn(lineitem), fn(part)).compute()) diff --git a/tpch/execute/q2.py b/tpch/execute/q2.py deleted file mode 100644 index 0e2d07019..000000000 --- a/tpch/execute/q2.py +++ /dev/null @@ -1,55 +0,0 @@ -from __future__ import annotations - -from queries import q2 - -from . import IO_FUNCS -from . import nation -from . import part -from . import partsupp -from . import region -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print( - q2.query( - fn(region), - fn(nation), - fn(supplier), - fn(part), - fn(partsupp), - ) -) -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print( - q2.query( - fn(region), - fn(nation), - fn(supplier), - fn(part), - fn(partsupp), - ).collect() -) -tool = "pyarrow" -fn = IO_FUNCS[tool] -print( - q2.query( - fn(region), - fn(nation), - fn(supplier), - fn(part), - fn(partsupp), - ) -) -tool = "dask" -fn = IO_FUNCS[tool] -print( - q2.query( - fn(region), - fn(nation), - fn(supplier), - fn(part), - fn(partsupp), - ).compute() -) diff --git a/tpch/execute/q20.py b/tpch/execute/q20.py deleted file mode 100644 index c4ffa43b4..000000000 --- a/tpch/execute/q20.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -from queries import q20 - -from . import IO_FUNCS -from . import lineitem -from . import nation -from . import part -from . import partsupp -from . import supplier - -fn = IO_FUNCS["pandas[pyarrow]"] -print(q20.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(supplier))) - -fn = IO_FUNCS["polars[lazy]"] -print(q20.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(supplier)).collect()) - -fn = IO_FUNCS["pyarrow"] -print(q20.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(supplier))) - -fn = IO_FUNCS["dask"] -print(q20.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(supplier)).compute()) diff --git a/tpch/execute/q21.py b/tpch/execute/q21.py deleted file mode 100644 index d6fb272ad..000000000 --- a/tpch/execute/q21.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import annotations - -from queries import q21 - -from . import IO_FUNCS -from . import lineitem -from . import nation -from . import orders -from . import supplier - -fn = IO_FUNCS["pandas[pyarrow]"] -print(q21.query(fn(lineitem), fn(nation), fn(orders), fn(supplier))) - -fn = IO_FUNCS["polars[lazy]"] -print(q21.query(fn(lineitem), fn(nation), fn(orders), fn(supplier)).collect()) - -fn = IO_FUNCS["pyarrow"] -print(q21.query(fn(lineitem), fn(nation), fn(orders), fn(supplier))) - -fn = IO_FUNCS["dask"] -print(q21.query(fn(lineitem), fn(nation), fn(orders), fn(supplier)).compute()) diff --git a/tpch/execute/q22.py b/tpch/execute/q22.py deleted file mode 100644 index f71fc4220..000000000 --- a/tpch/execute/q22.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q22 - -from . import IO_FUNCS -from . import customer -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q22.query(fn(customer), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q22.query(fn(customer), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q22.query(fn(customer), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q22.query(fn(customer), fn(orders)).compute()) diff --git a/tpch/execute/q3.py b/tpch/execute/q3.py deleted file mode 100644 index bbcc51d5c..000000000 --- a/tpch/execute/q3.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from queries import q3 - -from . import IO_FUNCS -from . import customer -from . import lineitem -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q3.query(fn(customer), fn(lineitem), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q3.query(fn(customer), fn(lineitem), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q3.query(fn(customer), fn(lineitem), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q3.query(fn(customer), fn(lineitem), fn(orders)).compute()) diff --git a/tpch/execute/q4.py b/tpch/execute/q4.py deleted file mode 100644 index bcfd3a158..000000000 --- a/tpch/execute/q4.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import annotations - -from queries import q4 - -from . import IO_FUNCS -from . import line_item -from . import orders - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q4.query(fn(line_item), fn(orders))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q4.query(fn(line_item), fn(orders)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q4.query(fn(line_item), fn(orders))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q4.query(fn(line_item), fn(orders)).compute()) diff --git a/tpch/execute/q5.py b/tpch/execute/q5.py deleted file mode 100644 index 66524c5a8..000000000 --- a/tpch/execute/q5.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import annotations - -from queries import q5 - -from . import IO_FUNCS -from . import customer -from . import line_item -from . import nation -from . import orders -from . import region -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print( - q5.query( - fn(region), fn(nation), fn(customer), fn(line_item), fn(orders), fn(supplier) - ) -) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print( - q5.query( - fn(region), fn(nation), fn(customer), fn(line_item), fn(orders), fn(supplier) - ).collect() -) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print( - q5.query( - fn(region), fn(nation), fn(customer), fn(line_item), fn(orders), fn(supplier) - ) -) - -tool = "dask" -fn = IO_FUNCS[tool] -print( - q5.query( - fn(region), fn(nation), fn(customer), fn(line_item), fn(orders), fn(supplier) - ).compute() -) diff --git a/tpch/execute/q6.py b/tpch/execute/q6.py deleted file mode 100644 index 1d650b794..000000000 --- a/tpch/execute/q6.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -from queries import q6 - -from . import IO_FUNCS -from . import lineitem - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q6.query(fn(lineitem))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print(q6.query(fn(lineitem)).collect()) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q6.query(fn(lineitem))) - -tool = "dask" -fn = IO_FUNCS[tool] -print(q6.query(fn(lineitem)).compute()) diff --git a/tpch/execute/q7.py b/tpch/execute/q7.py deleted file mode 100644 index 069fb258b..000000000 --- a/tpch/execute/q7.py +++ /dev/null @@ -1,30 +0,0 @@ -from __future__ import annotations - -from queries import q7 - -from . import IO_FUNCS -from . import customer -from . import lineitem -from . import nation -from . import orders -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print(q7.query(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print( - q7.query(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier)).collect() -) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print(q7.query(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))) - -tool = "dask" -fn = IO_FUNCS[tool] -print( - q7.query(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier)).compute() -) diff --git a/tpch/execute/q8.py b/tpch/execute/q8.py deleted file mode 100644 index 8c3aa5de9..000000000 --- a/tpch/execute/q8.py +++ /dev/null @@ -1,69 +0,0 @@ -from __future__ import annotations - -from queries import q8 - -from . import IO_FUNCS -from . import customer -from . import lineitem -from . import nation -from . import orders -from . import part -from . import region -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print( - q8.query( - fn(part), - fn(supplier), - fn(lineitem), - fn(orders), - fn(customer), - fn(nation), - fn(region), - ) -) - - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print( - q8.query( - fn(part), - fn(supplier), - fn(lineitem), - fn(orders), - fn(customer), - fn(nation), - fn(region), - ).collect() -) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print( - q8.query( - fn(part), - fn(supplier), - fn(lineitem), - fn(orders), - fn(customer), - fn(nation), - fn(region), - ) -) - -tool = "dask" -fn = IO_FUNCS[tool] -print( - q8.query( - fn(part), - fn(supplier), - fn(lineitem), - fn(orders), - fn(customer), - fn(nation), - fn(region), - ).compute() -) diff --git a/tpch/execute/q9.py b/tpch/execute/q9.py deleted file mode 100644 index 4c8e6874c..000000000 --- a/tpch/execute/q9.py +++ /dev/null @@ -1,39 +0,0 @@ -from __future__ import annotations - -from queries import q9 - -from . import IO_FUNCS -from . import lineitem -from . import nation -from . import orders -from . import part -from . import partsupp -from . import supplier - -tool = "pandas[pyarrow]" -fn = IO_FUNCS[tool] -print( - q9.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(orders), fn(supplier)) -) - -tool = "polars[lazy]" -fn = IO_FUNCS[tool] -print( - q9.query( - fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(orders), fn(supplier) - ).collect() -) - -tool = "pyarrow" -fn = IO_FUNCS[tool] -print( - q9.query(fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(orders), fn(supplier)) -) - -tool = "dask" -fn = IO_FUNCS[tool] -print( - q9.query( - fn(part), fn(partsupp), fn(nation), fn(lineitem), fn(orders), fn(supplier) - ).compute() -) diff --git a/tpch/tests/queries_test.py b/tpch/tests/queries_test.py index b0c24bd72..ec0614588 100644 --- a/tpch/tests/queries_test.py +++ b/tpch/tests/queries_test.py @@ -7,19 +7,19 @@ import pytest ROOT_PATH = Path(__file__).resolve().parent.parent -# directory containing all the script to execute queries -EXECUTE_DIR = ROOT_PATH / "execute" +# Directory containing all the query scripts +QUERIES_DIR = ROOT_PATH / "queries" -@pytest.mark.parametrize("script_path", EXECUTE_DIR.glob("q[1-9]*.py")) -def test_execute_scripts(script_path: Path) -> None: - print(f"executing query {script_path.stem}") # noqa: T201 +@pytest.mark.parametrize("query_path", QUERIES_DIR.glob("q[1-9]*.py")) +def test_execute_scripts(query_path: Path) -> None: + print(f"executing query {query_path.stem}") # noqa: T201 result = subprocess.run( # noqa: S603 - [sys.executable, "-m", f"execute.{script_path.stem}"], + [sys.executable, "-m", "execute.main", str(query_path.stem)], capture_output=True, text=True, check=False, ) assert ( result.returncode == 0 - ), f"Script {script_path} failed with error: {result.stderr}" + ), f"Script {query_path} failed with error: {result.stderr}" From 526549ade17230e49b4b7653b1c312012e3ceacc Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Wed, 13 Nov 2024 13:33:40 +0100 Subject: [PATCH 3/6] move to 1 execute script --- tpch/README.md | 2 +- tpch/{execute/main.py => execute.py} | 4 ++-- tpch/execute/__init__.py | 0 tpch/tests/queries_test.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename tpch/{execute/main.py => execute.py} (97%) delete mode 100644 tpch/execute/__init__.py diff --git a/tpch/README.md b/tpch/README.md index 65d6cd4c0..fbf130e7d 100644 --- a/tpch/README.md +++ b/tpch/README.md @@ -11,7 +11,7 @@ Run `python generate_data.py` from this folder. ## Run queries -To run Q1, you can run `python -m execute.main q1` from this folder. +To run Q1, you can run `python -m execute q1` from this folder. Please add query definitions in `queries`, and scripts to execute them in `execute` (see `queries/q1.py` and `execute/q1.py` for examples). diff --git a/tpch/execute/main.py b/tpch/execute.py similarity index 97% rename from tpch/execute/main.py rename to tpch/execute.py index 64d753393..76e1dda36 100644 --- a/tpch/execute/main.py +++ b/tpch/execute.py @@ -93,11 +93,11 @@ def execute_query(query_id: str) -> None: data_paths = QUERY_DATA[query_id] for backend, read_func in READ_FUNCS.items(): - print(f"\nRunning {query_id} with {backend=}") + print(f"\nRunning {query_id} with {backend=}") # noqa: T201 result = query_module.query(*(read_func(path) for path in data_paths)) if collect_func := COLLECT_FUNCS.get(backend): result = collect_func(result) - print(result) + print(result) # noqa: T201 def main() -> None: diff --git a/tpch/execute/__init__.py b/tpch/execute/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tpch/tests/queries_test.py b/tpch/tests/queries_test.py index ec0614588..2dfd7cc1c 100644 --- a/tpch/tests/queries_test.py +++ b/tpch/tests/queries_test.py @@ -15,7 +15,7 @@ def test_execute_scripts(query_path: Path) -> None: print(f"executing query {query_path.stem}") # noqa: T201 result = subprocess.run( # noqa: S603 - [sys.executable, "-m", "execute.main", str(query_path.stem)], + [sys.executable, "-m", "execute", str(query_path.stem)], capture_output=True, text=True, check=False, From bc5f7a67845be9af4bce14427d5474c035acc8e7 Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Wed, 13 Nov 2024 13:35:51 +0100 Subject: [PATCH 4/6] cleanup --- tpch/execute.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tpch/execute.py b/tpch/execute.py index 76e1dda36..4bce7c3aa 100644 --- a/tpch/execute.py +++ b/tpch/execute.py @@ -23,7 +23,7 @@ CUSTOMER_PATH = DATA_DIR / "customer.parquet" # Read functions for each backend -READ_FUNCS = { +BACKEND_READ_FUNC_MAP = { # "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), # noqa: ERA001 "pandas[pyarrow]": lambda x: pd.read_parquet( x, engine="pyarrow", dtype_backend="pyarrow" @@ -35,12 +35,12 @@ } # Collect functions for the lazy backends -COLLECT_FUNCS = { +BACKEND_COLLECT_FUNC_MAP = { "polars[lazy]": lambda x: x.collect(), # "dask": lambda x: x.compute(), # noqa: ERA001 } -QUERY_DATA = { +QUERY_DATA_PATH_MAP = { "q1": (LINEITEM_PATH,), "q2": (REGION_PATH, NATION_PATH, SUPPLIER_PATH, PART_PATH, PARTSUPP_PATH), "q3": (CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH), @@ -90,12 +90,12 @@ def execute_query(query_id: str) -> None: query_module = import_module(f"tpch.queries.{query_id}") - data_paths = QUERY_DATA[query_id] + data_paths = QUERY_DATA_PATH_MAP[query_id] - for backend, read_func in READ_FUNCS.items(): + for backend, read_func in BACKEND_READ_FUNC_MAP.items(): print(f"\nRunning {query_id} with {backend=}") # noqa: T201 result = query_module.query(*(read_func(path) for path in data_paths)) - if collect_func := COLLECT_FUNCS.get(backend): + if collect_func := BACKEND_COLLECT_FUNC_MAP.get(backend): result = collect_func(result) print(result) # noqa: T201 From fb6de4b7ff8e2e495375461f0d687cb7e4926708 Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Wed, 13 Nov 2024 18:37:20 +0100 Subject: [PATCH 5/6] cleanup --- tpch/execute.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tpch/execute.py b/tpch/execute.py index 4bce7c3aa..8bf563494 100644 --- a/tpch/execute.py +++ b/tpch/execute.py @@ -11,7 +11,6 @@ pd.options.mode.copy_on_write = True pd.options.future.infer_string = True -# Data paths DATA_DIR = Path("data") LINEITEM_PATH = DATA_DIR / "lineitem.parquet" REGION_PATH = DATA_DIR / "region.parquet" @@ -22,7 +21,6 @@ ORDERS_PATH = DATA_DIR / "orders.parquet" CUSTOMER_PATH = DATA_DIR / "customer.parquet" -# Read functions for each backend BACKEND_READ_FUNC_MAP = { # "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), # noqa: ERA001 "pandas[pyarrow]": lambda x: pd.read_parquet( @@ -34,7 +32,6 @@ # "dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"), # noqa: ERA001 } -# Collect functions for the lazy backends BACKEND_COLLECT_FUNC_MAP = { "polars[lazy]": lambda x: x.collect(), # "dask": lambda x: x.compute(), # noqa: ERA001 From 87ddb1bc38f33bce17b7708ae5cedec54f979b33 Mon Sep 17 00:00:00 2001 From: Edoardo Abati <29585319+EdAbati@users.noreply.github.com> Date: Thu, 14 Nov 2024 08:42:36 +0100 Subject: [PATCH 6/6] re-enable dask --- tpch/execute.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tpch/execute.py b/tpch/execute.py index 8bf563494..3b4ea11cb 100644 --- a/tpch/execute.py +++ b/tpch/execute.py @@ -4,6 +4,7 @@ from importlib import import_module from pathlib import Path +import dask.dataframe as dd import pandas as pd import polars as pl import pyarrow.parquet as pq @@ -29,12 +30,12 @@ # "polars[eager]": lambda x: pl.read_parquet(x), "polars[lazy]": lambda x: pl.scan_parquet(x), "pyarrow": lambda x: pq.read_table(x), - # "dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"), # noqa: ERA001 + "dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"), } BACKEND_COLLECT_FUNC_MAP = { "polars[lazy]": lambda x: x.collect(), - # "dask": lambda x: x.compute(), # noqa: ERA001 + "dask": lambda x: x.compute(), } QUERY_DATA_PATH_MAP = {