From e2d07305ba532109e805d189007cfba44f21d0fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:15:01 -0700 Subject: [PATCH 1/7] Draft Ballista README and examples --- Cargo.toml | 1 + ballista-examples/Cargo.toml | 39 ++++++++++++ ballista-examples/README.md | 38 ++++++++++++ .../examples/ballista-dataframe.rs | 56 +++++++++++++++++ ballista/README.md | 62 ++++++++++++++----- 5 files changed, 182 insertions(+), 14 deletions(-) create mode 100644 ballista-examples/Cargo.toml create mode 100644 ballista-examples/README.md create mode 100644 ballista-examples/examples/ballista-dataframe.rs diff --git a/Cargo.toml b/Cargo.toml index 351523d74c36..d6da8c14cd96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "ballista/rust/core", "ballista/rust/executor", "ballista/rust/scheduler", + "ballista-examples", ] exclude = ["python"] diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml new file mode 100644 index 000000000000..85af1152ce9a --- /dev/null +++ b/ballista-examples/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "ballista-examples" +description = "Ballista usage examples" +version = "0.5.0-SNAPSHOT" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "arrow", "distributed", "query", "sql" ] +edition = "2018" +publish = false + + +[dev-dependencies] +arrow-flight = { version = "5.0" } +datafusion = { path = "../datafusion" } +ballista = { path = "../ballista/rust/client" } +prost = "0.7" +tonic = "0.4" +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +futures = "0.3" +num_cpus = "1.13.0" diff --git a/ballista-examples/README.md b/ballista-examples/README.md new file mode 100644 index 000000000000..a049261cd0e0 --- /dev/null +++ b/ballista-examples/README.md @@ -0,0 +1,38 @@ +# Ballista Examples + +This directory contains examples for executing distributed queries with Ballista. + +For background information on the Ballista architecture, refer to +the [Ballista README](../ballista/README.md). + +## Start a standalone cluster + +From the root of the arrow-datafusion project, build release binaries. + +```bash +cargo build --release +``` + +Start a Ballista scheduler process in a new terminal session. + +```bash +RUST_LOG=info ./target/release/ballista-scheduler +``` + +Start one or more Ballista executor processes in new terminal sessions. When starting more than one +executor, a unique port number must be specified for each executor. + +```bash +RUST_LOG=info ./target/release/ballista-executor -c 4 +``` + +## Running the examples + +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` +environment variable so that the examples can find the test data files. + + +```bash +cargo run --release --example ballista-dataframe +``` + diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/examples/ballista-dataframe.rs new file mode 100644 index 000000000000..92b5433f081c --- /dev/null +++ b/ballista-examples/examples/ballista-dataframe.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista::prelude::*; +use datafusion::arrow::util::pretty; +use datafusion::prelude::{col, lit}; + +/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and +/// fetching results, using the DataFrame trait +#[tokio::main] +async fn main() -> Result<()> { + let config = BallistaConfig::builder() + .set("ballista.shuffle.partitions", "4") + .build()?; + let ctx = BallistaContext::remote("localhost", 50051, &config); + + let testdata = datafusion::arrow::util::test_util::parquet_test_data(); + + let filename = &format!("{}/alltypes_plain.parquet", testdata); + + // define the query using the DataFrame trait + let df = ctx + .read_parquet(filename)? + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(1)))?; + + // execute the query - note that calling collect on the DataFrame + // trait will execute the query with DataFusion so we have to call + // collect on the BallistaContext instead and pass it the DataFusion + // logical plan + let mut stream = ctx.collect(&df.to_logical_plan()).await?; + + // print the results + let mut results = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch?; + results.push(batch); + } + pretty::print_batches(&results)?; + + Ok(()) +} diff --git a/ballista/README.md b/ballista/README.md index 038146a639ed..5fc5e53d153b 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -17,9 +17,9 @@ under the License. --> -# Ballista: Distributed Compute with Apache Arrow +# Ballista: Distributed Compute with Apache Arrow and DataFusion -Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built +Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs. @@ -35,6 +35,51 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes]( case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide redundancy in the case of a scheduler failing. +# Getting Started + +## Start a standalone cluster + +```bash +cargo build --release +``` + +```bash +RUST_LOG=info ./target/release/ballista-scheduler +``` + +```bash +RUST_LOG=info ./target/release/ballista-executor -c 4 +``` + +## Run the examples + +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` +environment variable so that the examples can find the test data files. + +```bash +cargo run --example ballista-dataframe +``` + +## Distributed Scheduler Overview + +Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. + +Specifically, any `RepartitionExec` operators are replaced with an `UnresolvedShuffleExec` and the child operator of the repartition opertor is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. + +Each executor polls the scheduler for the next task to run. Tasks are currently always ShuffleWriterExec operators and each task represents one *input* partition that will be executed. The resulting batches are repartitioned according to the shuffle partitioning scheme and each *output* partition is written to disk in IPC format. + +The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight interface, and streams the shuffle IPC files. + +## Performance and Scalability + +The following chart shows a performance comparison of DataFusion and Ballista executing queries from the TPC-H benchmark at scale factor 100. + +TODO + +DataFusion: Concurrency=24. + +Ballista: One scheduler and two executors, with each executor configured to run 12 concurrent tasks. + # How does this compare to Apache Spark? Although Ballista is largely inspired by Apache Spark, there are some key differences. @@ -48,15 +93,4 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute. - The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors - in any programming language with minimal serialization overhead. - -## Status - -Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in -April 2021 and should be considered experimental. - -## Getting Started - -The [Ballista Developer Documentation](docs/README.md) and the -[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the -best sources of information for getting started with Ballista. + in any programming language with minimal serialization overhead. \ No newline at end of file From 81a0d89d104e7f58fd98bb1da80884d160a93073 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:20:28 -0700 Subject: [PATCH 2/7] Add SQL example --- ballista-examples/README.md | 5 +- ballista-examples/examples/ballista-sql.rs | 63 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 ballista-examples/examples/ballista-sql.rs diff --git a/ballista-examples/README.md b/ballista-examples/README.md index a049261cd0e0..6148a9da945e 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -28,9 +28,10 @@ RUST_LOG=info ./target/release/ballista-executor -c 4 ## Running the examples -Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` -environment variable so that the examples can find the test data files. +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and +`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files. +The examples can be run using the `cargo run --example` syntax. ```bash cargo run --release --example ballista-dataframe diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/examples/ballista-sql.rs new file mode 100644 index 000000000000..9d5b1cf81efc --- /dev/null +++ b/ballista-examples/examples/ballista-sql.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista::prelude::*; +use datafusion::arrow::util::pretty; +use datafusion::prelude::CsvReadOptions; + +/// This example demonstrates executing a simple query against an Arrow data source (CSV) and +/// fetching results, using SQL +#[tokio::main] +async fn main() -> Result<()> { + let config = BallistaConfig::builder() + .set("ballista.shuffle.partitions", "4") + .build()?; + let ctx = BallistaContext::remote("localhost", 50051, &config); + + let testdata = datafusion::arrow::util::test_util::arrow_test_data(); + + // register csv file with the execution context + ctx.register_csv( + "aggregate_test_100", + &format!("{}/csv/aggregate_test_100.csv", testdata), + CsvReadOptions::new(), + )?; + + // execute the query + let df = ctx.sql( + "SELECT c1, MIN(c12), MAX(c12) \ + FROM aggregate_test_100 \ + WHERE c11 > 0.1 AND c11 < 0.9 \ + GROUP BY c1", + )?; + + // execute the query - note that calling collect on the DataFrame + // trait will execute the query with DataFusion so we have to call + // collect on the BallistaContext instead and pass it the DataFusion + // logical plan + let mut stream = ctx.collect(&df.to_logical_plan()).await?; + + // print the results + let mut results = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch?; + results.push(batch); + } + pretty::print_batches(&results)?; + + Ok(()) +} From 2fd3e832065ba4bc66609ee42fd7ca876844acf0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:26:47 -0700 Subject: [PATCH 3/7] improve docs --- ballista/README.md | 56 +++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/ballista/README.md b/ballista/README.md index 5fc5e53d153b..f670cfde99b8 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -19,9 +19,9 @@ # Ballista: Distributed Compute with Apache Arrow and DataFusion -Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built -on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as -first-class citizens without paying a penalty for serialization costs. +Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and +DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and +Java) to be supported as first-class citizens without paying a penalty for serialization costs. The foundational technologies in Ballista are: @@ -37,52 +37,28 @@ redundancy in the case of a scheduler failing. # Getting Started -## Start a standalone cluster - -```bash -cargo build --release -``` - -```bash -RUST_LOG=info ./target/release/ballista-scheduler -``` - -```bash -RUST_LOG=info ./target/release/ballista-executor -c 4 -``` - -## Run the examples - -Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` -environment variable so that the examples can find the test data files. - -```bash -cargo run --example ballista-dataframe -``` +Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for +more information. ## Distributed Scheduler Overview -Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. - -Specifically, any `RepartitionExec` operators are replaced with an `UnresolvedShuffleExec` and the child operator of the repartition opertor is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. - -Each executor polls the scheduler for the next task to run. Tasks are currently always ShuffleWriterExec operators and each task represents one *input* partition that will be executed. The resulting batches are repartitioned according to the shuffle partitioning scheme and each *output* partition is written to disk in IPC format. - -The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight interface, and streams the shuffle IPC files. - -## Performance and Scalability - -The following chart shows a performance comparison of DataFusion and Ballista executing queries from the TPC-H benchmark at scale factor 100. +Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a +distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. -TODO +Specifically, any `RepartitionExec` operatoris is replaced with an `UnresolvedShuffleExec` and the child operator +of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. -DataFusion: Concurrency=24. +Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators +and each task represents one *input* partition that will be executed. The resulting batches are repartitioned +according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format. -Ballista: One scheduler and two executors, with each executor configured to run 12 concurrent tasks. +The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle +tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight +interface, and streams the shuffle IPC files. # How does this compare to Apache Spark? -Although Ballista is largely inspired by Apache Spark, there are some key differences. +Ballista implements a similar design to Apache Spark, but there are some key differences. - The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of GC pauses. From 6b8d75134eeccb13b24d52f817b4cf6cb5895d82 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:31:42 -0700 Subject: [PATCH 4/7] fix port in exampls --- ballista-examples/examples/ballista-dataframe.rs | 2 +- ballista-examples/examples/ballista-sql.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/examples/ballista-dataframe.rs index 92b5433f081c..da7d99db1cf0 100644 --- a/ballista-examples/examples/ballista-dataframe.rs +++ b/ballista-examples/examples/ballista-dataframe.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("localhost", 50051, &config); + let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::arrow::util::test_util::parquet_test_data(); diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/examples/ballista-sql.rs index 9d5b1cf81efc..f9e7d180af45 100644 --- a/ballista-examples/examples/ballista-sql.rs +++ b/ballista-examples/examples/ballista-sql.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("localhost", 50051, &config); + let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::arrow::util::test_util::arrow_test_data(); From 0365451cadae67ce2eb4c6902cc4f51866d231e4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 11:13:37 -0700 Subject: [PATCH 5/7] Add ASF header to ballista-exampls README --- ballista-examples/README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/ballista-examples/README.md b/ballista-examples/README.md index 6148a9da945e..29365bfe8a4f 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -1,3 +1,22 @@ + + # Ballista Examples This directory contains examples for executing distributed queries with Ballista. From c62b8dc52c1dc313e4e92d425dbed170090d3373 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 08:46:08 -0700 Subject: [PATCH 6/7] Update ballista/README.md Co-authored-by: Andrew Lamb --- ballista/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista/README.md b/ballista/README.md index f670cfde99b8..0a8db63a1a6c 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -45,7 +45,7 @@ more information. Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. -Specifically, any `RepartitionExec` operatoris is replaced with an `UnresolvedShuffleExec` and the child operator +Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators @@ -69,4 +69,4 @@ Ballista implements a similar design to Apache Spark, but there are some key dif Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute. - The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors - in any programming language with minimal serialization overhead. \ No newline at end of file + in any programming language with minimal serialization overhead. From f66a881d99471179082ecb51ff5ffe684a639319 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 11:09:35 -0700 Subject: [PATCH 7/7] Change examples to binaries --- ballista-examples/Cargo.toml | 3 +-- ballista-examples/README.md | 4 ++-- ballista-examples/{examples => src/bin}/ballista-dataframe.rs | 0 ballista-examples/{examples => src/bin}/ballista-sql.rs | 0 4 files changed, 3 insertions(+), 4 deletions(-) rename ballista-examples/{examples => src/bin}/ballista-dataframe.rs (100%) rename ballista-examples/{examples => src/bin}/ballista-sql.rs (100%) diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index 85af1152ce9a..b7d40223c469 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -27,8 +27,7 @@ keywords = [ "arrow", "distributed", "query", "sql" ] edition = "2018" publish = false - -[dev-dependencies] +[dependencies] arrow-flight = { version = "5.0" } datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } diff --git a/ballista-examples/README.md b/ballista-examples/README.md index 29365bfe8a4f..1364ad47598b 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -50,9 +50,9 @@ RUST_LOG=info ./target/release/ballista-executor -c 4 Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and `PARQUET_TEST_DATA` environment variables so that the examples can find the test data files. -The examples can be run using the `cargo run --example` syntax. +The examples can be run using the `cargo run --bin` syntax. ```bash -cargo run --release --example ballista-dataframe +cargo run --release --bin ballista-dataframe ``` diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs similarity index 100% rename from ballista-examples/examples/ballista-dataframe.rs rename to ballista-examples/src/bin/ballista-dataframe.rs diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs similarity index 100% rename from ballista-examples/examples/ballista-sql.rs rename to ballista-examples/src/bin/ballista-sql.rs