From 4ddb279041df8428aa2744ea45b55c7b98ed7a13 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 23 Feb 2022 18:40:14 -0500 Subject: [PATCH 01/11] Initial --- .../db-benchmark/db-benchmark.dockerfile | 11 + benchmarks/db-benchmark/groupby-datafusion.py | 223 ++++++++++++++++++ benchmarks/db-benchmark/join-datafusion.py | 151 ++++++++++++ benchmarks/db-benchmark/run.sh | 19 ++ 4 files changed, 404 insertions(+) create mode 100644 benchmarks/db-benchmark/db-benchmark.dockerfile create mode 100644 benchmarks/db-benchmark/groupby-datafusion.py create mode 100755 benchmarks/db-benchmark/join-datafusion.py create mode 100644 benchmarks/db-benchmark/run.sh diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile new file mode 100644 index 000000000000..d5412f65f7ba --- /dev/null +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -0,0 +1,11 @@ +FROM ubuntu + +RUN apt-get update && \ + apt-get install -y git build-essential + +RUN sudo apt install dirmngr gnupg apt-transport-https ca-certificates software-properties-common \ + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 \ + sudo add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' \ + sudo apt install r-base + +RUN git clone https://github.com/apache/arrow-datafusion.git \ No newline at end of file diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py new file mode 100644 index 000000000000..def7fedf72cd --- /dev/null +++ b/benchmarks/db-benchmark/groupby-datafusion.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python + +print("# groupby-datafusion.py", flush=True) + +import os +import gc +import timeit +import datafusion as df +from datafusion import functions as f +from datafusion import col +from pyarrow import csv as pacsv + +exec(open("./_helpers/helpers.py").read()) + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert(cols == batch.num_columns) + + return rows, cols + +# ver = df.__version__ +ver = "7.0.0" +git = "" +task = "groupby" +solution = "datafusion" +fun = ".groupby" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ["SRC_DATANAME"] +src_grp = os.path.join("data", data_name + ".csv") +print("loading dataset %s" % data_name, flush=True) + +data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) + +ctx = df.ExecutionContext() +ctx.register_record_batches("x", [data.to_batches()]) +# cols = ctx.sql("SHOW columns from x") +# ans.show() + +in_rows = data.num_rows +# print(in_rows, flush=True) + +task_init = timeit.default_timer() + +question = "sum v1 by id1" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q1: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1 by id1:id2" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q2: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1 mean v3 by id3" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q3: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "mean v1:v3 by id4" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q4: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v1:v3 by id6" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q5: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "median v3 sd v3 by id4 id5" # q6 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q6: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("median_v3")), f.sum(col("stddev_v3"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "max v1 - min v2 by id3" # q7 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q7: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "largest two v3 by id6" # q8 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q8: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "regression v1 v2 by id2 id4" # q9 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q9: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "sum v3 count by id1:id6" # q10 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q10: {t}") +m = memory_usage() +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) + +exit(0) diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py new file mode 100755 index 000000000000..d156bc076be5 --- /dev/null +++ b/benchmarks/db-benchmark/join-datafusion.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python + +print("# join-datafusion.py", flush=True) + +import os +import gc +import timeit +import datafusion as df +from datafusion import functions as f +from datafusion import col +from pyarrow import csv as pacsv + +exec(open("./_helpers/helpers.py").read()) + +def ans_shape(batches): + rows, cols = 0, 0 + for batch in batches: + rows += batch.num_rows + if cols == 0: + cols = batch.num_columns + else: + assert(cols == batch.num_columns) + + return rows, cols + +ver = "6.0.0" +task = "join" +git = "" +solution = "datafusion" +fun = ".join" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ["SRC_DATANAME"] +src_jn_x = os.path.join("data", data_name + ".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv")] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") + +print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) + +ctx = df.ExecutionContext() + +x_data = pacsv.read_csv(src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("x", [x_data.to_batches()]) +small_data = pacsv.read_csv(src_jn_y[0], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("small", [small_data.to_batches()]) +medium_data = pacsv.read_csv(src_jn_y[1], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("medium", [medium_data.to_batches()]) +large_data = pacsv.read_csv(src_jn_y[2], convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +ctx.register_record_batches("large", [large_data.to_batches()]) + +print(x_data.num_rows, flush=True) +print(small_data.num_rows, flush=True) +print(medium_data.num_rows, flush=True) +print(large_data.num_rows, flush=True) + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1").collect() +# ans = ctx.sql("SELECT * FROM x INNER JOIN small ON x.id1 = small.id1").collect() +# print(set([b.schema for b in ans])) +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q1: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q2: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2").collect() +shape = ans_shape(ans) +# print(shape, flush=True) +t = timeit.default_timer() - t_start +print(f"q3: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id5 = medium.id5").collect() +shape = ans_shape(ans) +# print(shape) +t = timeit.default_timer() - t_start +print(f"q4: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +question = "big inner on int" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = ctx.sql("SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x LEFT JOIN large ON x.id3 = large.id3").collect() +shape = ans_shape(ans) +# print(shape) +t = timeit.default_timer() - t_start +print(f"q5: {t}") +t_start = timeit.default_timer() +df = ctx.create_dataframe([ans]) +chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] +chkt = timeit.default_timer() - t_start +m = memory_usage() +write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() + +print("joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) + +exit(0) diff --git a/benchmarks/db-benchmark/run.sh b/benchmarks/db-benchmark/run.sh new file mode 100644 index 000000000000..0f9d1383ba1f --- /dev/null +++ b/benchmarks/db-benchmark/run.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +set -e + From cb50c809b8563b1a7c1c6c679ce2bb70480dc953 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 02:45:15 -0500 Subject: [PATCH 02/11] Create Dockerfile --- .../db-benchmark/db-benchmark.dockerfile | 53 +++++++++++++++++-- benchmarks/db-benchmark/run.sh | 2 + 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index d5412f65f7ba..bf186bddcbaa 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -1,11 +1,54 @@ FROM ubuntu +ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ apt-get install -y git build-essential -RUN sudo apt install dirmngr gnupg apt-transport-https ca-certificates software-properties-common \ - sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 \ - sudo add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/' \ - sudo apt install r-base +# Install R, curl, and python deps +RUN apt-get update && apt-get -y install --no-install-recommends --no-install-suggests \ + ca-certificates software-properties-common gnupg2 gnupg1 \ + && apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 \ + && add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/' \ + && apt-get -y install r-base \ + && apt-get -y install curl \ + && apt-get -y install python3.8 \ + && apt-get -y install python3-pip + +# Install R libraries +RUN R -e "install.packages('data.table',dependencies=TRUE, repos='http://cran.rstudio.com/')" \ + && R -e "install.packages('dplyr',dependencies=TRUE, repos='http://cran.rstudio.com/')" + +# Install Rust +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" + +# Clone db-benchmark and download data +RUN git clone https://github.com/h2oai/db-benchmark \ + && Rscript db-benchmark/_data/groupby-datagen.R 1e7 1e2 0 0 \ + && Rscript db-benchmark/_data/join-datagen.R 1e7 0 0 0 + +# Copy local arrow-datafusion +COPY . arrow-datafusion + +# Clone datafusion-python and build python library +# Not sure if the wheel will be the same on all computers +RUN git clone https://github.com/datafusion-contrib/datafusion-python \ + && python3 -m pip install pip \ + && python3 -m pip install -r datafusion-python/requirements.txt \ + && cd datafusion-python && maturin build --release \ + && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ + && cd ../db-benchmark + +# Make datafusion directory in db-benchmark +RUN mkdir datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/groupby-datafusion.py datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/join-datafusion.py datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/run.sh . + +WORKDIR /db-benchmark + +ENTRYPOINT ["/db-benchmark/run.sh"] + + + -RUN git clone https://github.com/apache/arrow-datafusion.git \ No newline at end of file diff --git a/benchmarks/db-benchmark/run.sh b/benchmarks/db-benchmark/run.sh index 0f9d1383ba1f..9ccc26804d19 100644 --- a/benchmarks/db-benchmark/run.sh +++ b/benchmarks/db-benchmark/run.sh @@ -17,3 +17,5 @@ # under the License. set -e +SRC_DATANAME=G1_1e7_1e2_0_0 python3 datafusion/groupby-datafusion.py +SRC_DATANAME=J1_1e7_NA_0_0 python3 datafusion/join-datafusion.py From 71cdbb668a36e31be77fa978d28059bb73792a05 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 22:38:09 -0500 Subject: [PATCH 03/11] Fix entry and script names --- .../db-benchmark/db-benchmark.dockerfile | 41 +++++++++++------ benchmarks/db-benchmark/groupby-datafusion.py | 46 ++++++++++--------- benchmarks/db-benchmark/join-datafusion.py | 29 +++++++----- .../db-benchmark/{run.sh => run-bench.sh} | 0 4 files changed, 69 insertions(+), 47 deletions(-) rename benchmarks/db-benchmark/{run.sh => run-bench.sh} (100%) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index bf186bddcbaa..87e994ee50a5 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -5,7 +5,7 @@ RUN apt-get update && \ apt-get install -y git build-essential # Install R, curl, and python deps -RUN apt-get update && apt-get -y install --no-install-recommends --no-install-suggests \ +RUN apt-get -y install --no-install-recommends --no-install-suggests \ ca-certificates software-properties-common gnupg2 gnupg1 \ && apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 \ && add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/' \ @@ -24,30 +24,43 @@ ENV PATH="/root/.cargo/bin:${PATH}" # Clone db-benchmark and download data RUN git clone https://github.com/h2oai/db-benchmark \ - && Rscript db-benchmark/_data/groupby-datagen.R 1e7 1e2 0 0 \ - && Rscript db-benchmark/_data/join-datagen.R 1e7 0 0 0 - -# Copy local arrow-datafusion -COPY . arrow-datafusion + && cd db-benchmark \ + && Rscript _data/groupby-datagen.R 1e7 1e2 0 0 \ + && Rscript _data/join-datagen.R 1e7 0 0 0 \ + && mkdir data \ + && mv G1_1e7_1e2_0_0.csv data \ + && mv J1_1e7_1e1_0_0.csv data \ + && mv J1_1e7_1e4_0_0.csv data \ + && mv J1_1e7_1e7_0_0.csv data \ + && mv J1_1e7_NA_0_0.csv data \ + && cd .. # Clone datafusion-python and build python library # Not sure if the wheel will be the same on all computers RUN git clone https://github.com/datafusion-contrib/datafusion-python \ + && cd datafusion-python && git reset --hard 368b50ed9662d5e93c70b539f94cceace685265e \ && python3 -m pip install pip \ - && python3 -m pip install -r datafusion-python/requirements.txt \ - && cd datafusion-python && maturin build --release \ + && python3 -m pip install pandas \ + && python3 -m pip install -r requirements.txt \ + && maturin build --release \ && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ - && cd ../db-benchmark + && cd .. + +# Copy local arrow-datafusion +COPY . arrow-datafusion # Make datafusion directory in db-benchmark -RUN mkdir datafusion \ - && cp ../arrow-datafusion/benchmarks/db-benchmark/groupby-datafusion.py datafusion \ - && cp ../arrow-datafusion/benchmarks/db-benchmark/join-datafusion.py datafusion \ - && cp ../arrow-datafusion/benchmarks/db-benchmark/run.sh . +RUN mkdir db-benchmark/datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/groupby-datafusion.py db-benchmark/datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/join-datafusion.py db-benchmark/datafusion \ + && cp ../arrow-datafusion/benchmarks/db-benchmark/run-bench.sh db-benchmark/ \ + && chmod +x db-benchmark/run-bench.sh WORKDIR /db-benchmark -ENTRYPOINT ["/db-benchmark/run.sh"] +RUN ls && ls -al data/ + +ENTRYPOINT ./run-bench.sh diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py index def7fedf72cd..12000e9f2f64 100644 --- a/benchmarks/db-benchmark/groupby-datafusion.py +++ b/benchmarks/db-benchmark/groupby-datafusion.py @@ -10,7 +10,7 @@ from datafusion import col from pyarrow import csv as pacsv -exec(open("./_helpers/helpers.py").read()) +# exec(open("./_helpers/helpers.py").read()) def ans_shape(batches): rows, cols = 0, 0 @@ -34,12 +34,14 @@ def ans_shape(batches): data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") -print("loading dataset %s" % data_name, flush=True) +print("loading dataset %s" % src_grp, flush=True) data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) +print("dataset loaded") ctx = df.ExecutionContext() ctx.register_record_batches("x", [data.to_batches()]) +print("registered record batches") # cols = ctx.sql("SHOW columns from x") # ans.show() @@ -56,12 +58,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q1: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -73,12 +75,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q2: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -90,12 +92,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q3: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -107,12 +109,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q4: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -124,12 +126,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q5: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -141,12 +143,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q6: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("median_v3")), f.sum(col("stddev_v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -158,12 +160,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q7: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -175,12 +177,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q8: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -192,12 +194,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q9: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -209,12 +211,12 @@ def ans_shape(batches): # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q10: {t}") -m = memory_usage() +# m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py index d156bc076be5..37d251274536 100755 --- a/benchmarks/db-benchmark/join-datafusion.py +++ b/benchmarks/db-benchmark/join-datafusion.py @@ -10,7 +10,14 @@ from datafusion import col from pyarrow import csv as pacsv -exec(open("./_helpers/helpers.py").read()) +# exec(open("./_helpers/helpers.py").read()) + +def join_to_tbls(data_name): + x_n = int(float(data_name.split("_")[1])) + y_n = ["{:.0e}".format(x_n/1e6), "{:.0e}".format(x_n/1e3), "{:.0e}".format(x_n)] + y_n = [y_n[0].replace('+0', ''), y_n[1].replace('+0', ''), y_n[2].replace('+0', '')] + return [data_name.replace('NA', y_n[0]), data_name.replace('NA', y_n[1]), data_name.replace('NA', y_n[2])] + def ans_shape(batches): rows, cols = 0, 0 @@ -73,8 +80,8 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -m = memory_usage() -write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# m = memory_usage() +# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -90,8 +97,8 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -m = memory_usage() -write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# m = memory_usage() +# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -107,8 +114,8 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -m = memory_usage() -write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# m = memory_usage() +# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -124,8 +131,8 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -m = memory_usage() -write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# m = memory_usage() +# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() @@ -141,8 +148,8 @@ def ans_shape(batches): df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start -m = memory_usage() -write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) +# m = memory_usage() +# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() diff --git a/benchmarks/db-benchmark/run.sh b/benchmarks/db-benchmark/run-bench.sh similarity index 100% rename from benchmarks/db-benchmark/run.sh rename to benchmarks/db-benchmark/run-bench.sh From 6e519372e5758423a773985a898b0687423a56e4 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 22:39:25 -0500 Subject: [PATCH 04/11] Remove blank lines --- benchmarks/db-benchmark/db-benchmark.dockerfile | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index 87e994ee50a5..93779defbe94 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -60,8 +60,4 @@ WORKDIR /db-benchmark RUN ls && ls -al data/ -ENTRYPOINT ./run-bench.sh - - - - +ENTRYPOINT ./run-bench.sh \ No newline at end of file From 876427e3e086d633f09ff9cae275621427c4a72c Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 23:40:29 -0500 Subject: [PATCH 05/11] Add local datafusion option --- .../db-benchmark/db-benchmark.dockerfile | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index 93779defbe94..9ce736669bc2 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -41,14 +41,28 @@ RUN git clone https://github.com/datafusion-contrib/datafusion-python \ && cd datafusion-python && git reset --hard 368b50ed9662d5e93c70b539f94cceace685265e \ && python3 -m pip install pip \ && python3 -m pip install pandas \ - && python3 -m pip install -r requirements.txt \ - && maturin build --release \ - && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ + && python3 -m pip install -r requirements.txt \ && cd .. # Copy local arrow-datafusion COPY . arrow-datafusion +# 1. datafusion-python that builds from datafusion version 7 +RUN cd datafusion-python \ + && maturin build --release \ + && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ + && cd .. + +# 2. datafusion-python that builds from local datafusion. use this when making local changes to datafusion. +# Currently, as of March 5th 2022, this done not build (i think) because datafusion is being split into multiple crates +# and datafusion-python has not yet been updated to reflect this. +# RUN cd datafusion-python \ +# && sed -i '/datafusion =/c\datafusion = { path = "../arrow-datafusion/datafusion", features = ["pyarrow"] }' Cargo.toml \ +# && sed -i '/fuzz-utils/d' ../arrow-datafusion/datafusion/Cargo.toml \ +# && maturin build --release \ +# && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ +# && cd .. + # Make datafusion directory in db-benchmark RUN mkdir db-benchmark/datafusion \ && cp ../arrow-datafusion/benchmarks/db-benchmark/groupby-datafusion.py db-benchmark/datafusion \ From eaec91b89885cce407930ba44917773cbf94bbb6 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 23:45:08 -0500 Subject: [PATCH 06/11] Fix comment --- benchmarks/db-benchmark/db-benchmark.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index 9ce736669bc2..3035d3251089 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -47,7 +47,7 @@ RUN git clone https://github.com/datafusion-contrib/datafusion-python \ # Copy local arrow-datafusion COPY . arrow-datafusion -# 1. datafusion-python that builds from datafusion version 7 +# 1. datafusion-python that builds from datafusion version referenced datafusion-python RUN cd datafusion-python \ && maturin build --release \ && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ From ecf209b6d01d672679e15c2bae0e85b3781ad6c2 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 5 Mar 2022 23:49:04 -0500 Subject: [PATCH 07/11] Add readme --- benchmarks/db-benchmark/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 benchmarks/db-benchmark/README.md diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md new file mode 100644 index 000000000000..3672c029e317 --- /dev/null +++ b/benchmarks/db-benchmark/README.md @@ -0,0 +1,10 @@ +# Run db-benchmark + +## Directions + +Run the following from root `arrow-datafusion` directory + +```bash +$ docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . +$ docker run --privileged db-benchmark +``` From d885ef298f9e875cf1f42c430644152ead5139b8 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 14 Mar 2022 10:10:00 -0400 Subject: [PATCH 08/11] Add license --- benchmarks/db-benchmark/db-benchmark.dockerfile | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index 3035d3251089..b40bff10d3e9 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + FROM ubuntu ARG DEBIAN_FRONTEND=noninteractive From 7c23302a5d5260c49579866cb39599f4090983c6 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 14 Mar 2022 10:13:48 -0400 Subject: [PATCH 09/11] More licenses --- benchmarks/db-benchmark/README.md | 19 +++++++++++++++++++ benchmarks/db-benchmark/groupby-datafusion.py | 17 +++++++++++++++++ benchmarks/db-benchmark/join-datafusion.py | 17 +++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md index 3672c029e317..b616aa82da38 100644 --- a/benchmarks/db-benchmark/README.md +++ b/benchmarks/db-benchmark/README.md @@ -1,3 +1,22 @@ + + # Run db-benchmark ## Directions diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py index 12000e9f2f64..5bf6c36a0e66 100644 --- a/benchmarks/db-benchmark/groupby-datafusion.py +++ b/benchmarks/db-benchmark/groupby-datafusion.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + #!/usr/bin/env python print("# groupby-datafusion.py", flush=True) diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py index 37d251274536..1a83b1f60d69 100755 --- a/benchmarks/db-benchmark/join-datafusion.py +++ b/benchmarks/db-benchmark/join-datafusion.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + #!/usr/bin/env python print("# join-datafusion.py", flush=True) From b6c304eafd7eaae80a03d739f7b881e51c2696ad Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 16 Mar 2022 10:03:16 -0400 Subject: [PATCH 10/11] Arch agnostic --- benchmarks/db-benchmark/README.md | 2 +- .../db-benchmark/db-benchmark.dockerfile | 24 ++++++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md index b616aa82da38..801a1b12ac14 100644 --- a/benchmarks/db-benchmark/README.md +++ b/benchmarks/db-benchmark/README.md @@ -24,6 +24,6 @@ Run the following from root `arrow-datafusion` directory ```bash -$ docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . +$ docker buildx -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . $ docker run --privileged db-benchmark ``` diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile index b40bff10d3e9..b21d3a0d1e0e 100644 --- a/benchmarks/db-benchmark/db-benchmark.dockerfile +++ b/benchmarks/db-benchmark/db-benchmark.dockerfile @@ -17,6 +17,7 @@ FROM ubuntu ARG DEBIAN_FRONTEND=noninteractive +ARG TARGETPLATFORM RUN apt-get update && \ apt-get install -y git build-essential @@ -67,18 +68,29 @@ COPY . arrow-datafusion # 1. datafusion-python that builds from datafusion version referenced datafusion-python RUN cd datafusion-python \ && maturin build --release \ - && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ + && case "${TARGETPLATFORM}" in \ + */amd64) CPUARCH=x86_64 ;; \ + */arm64) CPUARCH=aarch64 ;; \ + *) exit 1 ;; \ + esac \ + # Version will need to be updated in conjunction with datafusion-python version + && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_${CPUARCH}.whl \ && cd .. # 2. datafusion-python that builds from local datafusion. use this when making local changes to datafusion. # Currently, as of March 5th 2022, this done not build (i think) because datafusion is being split into multiple crates # and datafusion-python has not yet been updated to reflect this. # RUN cd datafusion-python \ -# && sed -i '/datafusion =/c\datafusion = { path = "../arrow-datafusion/datafusion", features = ["pyarrow"] }' Cargo.toml \ -# && sed -i '/fuzz-utils/d' ../arrow-datafusion/datafusion/Cargo.toml \ -# && maturin build --release \ -# && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_aarch64.whl \ -# && cd .. +# && sed -i '/datafusion =/c\datafusion = { path = "../arrow-datafusion/datafusion", features = ["pyarrow"] }' Cargo.toml \ +# && sed -i '/fuzz-utils/d' ../arrow-datafusion/datafusion/Cargo.toml \ +# && maturin build --release \ +# && case "${TARGETPLATFORM}" in \ +# */amd64) CPUARCH=x86_64 ;; \ +# */amd64) CPUARCH=aarch64 ;; \ +# *) exit 1 ;; \ +# esac \ +# && python3 -m pip install target/wheels/datafusion-0.4.0-cp36-abi3-linux_${CPUARCH}.whl \ +# && cd .. # Make datafusion directory in db-benchmark RUN mkdir db-benchmark/datafusion \ From fde6f53bcc17585e61957d9b0def479b308931d0 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 16 Mar 2022 11:08:21 -0400 Subject: [PATCH 11/11] Fix readme --- benchmarks/db-benchmark/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md index 801a1b12ac14..e1fc2a3504cf 100644 --- a/benchmarks/db-benchmark/README.md +++ b/benchmarks/db-benchmark/README.md @@ -24,6 +24,6 @@ Run the following from root `arrow-datafusion` directory ```bash -$ docker buildx -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . +$ docker buildx build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile . $ docker run --privileged db-benchmark ```