Skip to content

Commit

Permalink
chore: cleanup tpch execute (#1374)
Browse files Browse the repository at this point in the history
  • Loading branch information
EdAbati authored Nov 14, 2024
1 parent 4f20d10 commit d8d30d9
Show file tree
Hide file tree
Showing 26 changed files with 128 additions and 552 deletions.
2 changes: 1 addition & 1 deletion tpch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Run `python generate_data.py` from this folder.

## Run queries

To run Q1, you can run `python -m execute.q1` from this folder.
To run Q1, you can run `python -m execute q1` from this folder.

Please add query definitions in `queries`, and scripts to execute them
in `execute` (see `queries/q1.py` and `execute/q1.py` for examples).
110 changes: 110 additions & 0 deletions tpch/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from __future__ import annotations

import argparse
from importlib import import_module
from pathlib import Path

import dask.dataframe as dd
import pandas as pd
import polars as pl
import pyarrow.parquet as pq

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

DATA_DIR = Path("data")
LINEITEM_PATH = DATA_DIR / "lineitem.parquet"
REGION_PATH = DATA_DIR / "region.parquet"
NATION_PATH = DATA_DIR / "nation.parquet"
SUPPLIER_PATH = DATA_DIR / "supplier.parquet"
PART_PATH = DATA_DIR / "part.parquet"
PARTSUPP_PATH = DATA_DIR / "partsupp.parquet"
ORDERS_PATH = DATA_DIR / "orders.parquet"
CUSTOMER_PATH = DATA_DIR / "customer.parquet"

BACKEND_READ_FUNC_MAP = {
# "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"), # noqa: ERA001
"pandas[pyarrow]": lambda x: pd.read_parquet(
x, engine="pyarrow", dtype_backend="pyarrow"
),
# "polars[eager]": lambda x: pl.read_parquet(x),
"polars[lazy]": lambda x: pl.scan_parquet(x),
"pyarrow": lambda x: pq.read_table(x),
"dask": lambda x: dd.read_parquet(x, engine="pyarrow", dtype_backend="pyarrow"),
}

BACKEND_COLLECT_FUNC_MAP = {
"polars[lazy]": lambda x: x.collect(),
"dask": lambda x: x.compute(),
}

QUERY_DATA_PATH_MAP = {
"q1": (LINEITEM_PATH,),
"q2": (REGION_PATH, NATION_PATH, SUPPLIER_PATH, PART_PATH, PARTSUPP_PATH),
"q3": (CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH),
"q4": (LINEITEM_PATH, ORDERS_PATH),
"q5": (
REGION_PATH,
NATION_PATH,
CUSTOMER_PATH,
LINEITEM_PATH,
ORDERS_PATH,
SUPPLIER_PATH,
),
"q6": (LINEITEM_PATH,),
"q7": (NATION_PATH, CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH, SUPPLIER_PATH),
"q8": (
PART_PATH,
SUPPLIER_PATH,
LINEITEM_PATH,
ORDERS_PATH,
CUSTOMER_PATH,
NATION_PATH,
REGION_PATH,
),
"q9": (
PART_PATH,
PARTSUPP_PATH,
NATION_PATH,
LINEITEM_PATH,
ORDERS_PATH,
SUPPLIER_PATH,
),
"q10": (CUSTOMER_PATH, NATION_PATH, LINEITEM_PATH, ORDERS_PATH),
"q11": (NATION_PATH, PARTSUPP_PATH, SUPPLIER_PATH),
"q12": (LINEITEM_PATH, ORDERS_PATH),
"q13": (CUSTOMER_PATH, ORDERS_PATH),
"q14": (LINEITEM_PATH, PART_PATH),
"q15": (LINEITEM_PATH, SUPPLIER_PATH),
"q16": (PART_PATH, PARTSUPP_PATH, SUPPLIER_PATH),
"q17": (LINEITEM_PATH, PART_PATH),
"q18": (CUSTOMER_PATH, LINEITEM_PATH, ORDERS_PATH),
"q19": (LINEITEM_PATH, PART_PATH),
"q20": (PART_PATH, PARTSUPP_PATH, NATION_PATH, LINEITEM_PATH, SUPPLIER_PATH),
"q21": (LINEITEM_PATH, NATION_PATH, ORDERS_PATH, SUPPLIER_PATH),
"q22": (CUSTOMER_PATH, ORDERS_PATH),
}


def execute_query(query_id: str) -> None:
query_module = import_module(f"tpch.queries.{query_id}")
data_paths = QUERY_DATA_PATH_MAP[query_id]

for backend, read_func in BACKEND_READ_FUNC_MAP.items():
print(f"\nRunning {query_id} with {backend=}") # noqa: T201
result = query_module.query(*(read_func(path) for path in data_paths))
if collect_func := BACKEND_COLLECT_FUNC_MAP.get(backend):
result = collect_func(result)
print(result) # noqa: T201


def main() -> None:
parser = argparse.ArgumentParser(description="Execute a TPCH query by number.")
parser.add_argument("query", type=str, help="The query to execute, e.g. 'q1'.")
args = parser.parse_args()

execute_query(query_id=args.query)


if __name__ == "__main__":
main()
32 changes: 0 additions & 32 deletions tpch/execute/__init__.py

This file was deleted.

10 changes: 0 additions & 10 deletions tpch/execute/q1.py

This file was deleted.

21 changes: 0 additions & 21 deletions tpch/execute/q10.py

This file was deleted.

20 changes: 0 additions & 20 deletions tpch/execute/q11.py

This file was deleted.

19 changes: 0 additions & 19 deletions tpch/execute/q12.py

This file was deleted.

19 changes: 0 additions & 19 deletions tpch/execute/q13.py

This file was deleted.

19 changes: 0 additions & 19 deletions tpch/execute/q14.py

This file was deleted.

19 changes: 0 additions & 19 deletions tpch/execute/q15.py

This file was deleted.

20 changes: 0 additions & 20 deletions tpch/execute/q16.py

This file was deleted.

19 changes: 0 additions & 19 deletions tpch/execute/q17.py

This file was deleted.

20 changes: 0 additions & 20 deletions tpch/execute/q18.py

This file was deleted.

16 changes: 0 additions & 16 deletions tpch/execute/q19.py

This file was deleted.

Loading

0 comments on commit d8d30d9

Please sign in to comment.