From e2d07305ba532109e805d189007cfba44f21d0fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:15:01 -0700 Subject: [PATCH 01/18] Draft Ballista README and examples --- Cargo.toml | 1 + ballista-examples/Cargo.toml | 39 ++++++++++++ ballista-examples/README.md | 38 ++++++++++++ .../examples/ballista-dataframe.rs | 56 +++++++++++++++++ ballista/README.md | 62 ++++++++++++++----- 5 files changed, 182 insertions(+), 14 deletions(-) create mode 100644 ballista-examples/Cargo.toml create mode 100644 ballista-examples/README.md create mode 100644 ballista-examples/examples/ballista-dataframe.rs diff --git a/Cargo.toml b/Cargo.toml index 351523d74c36..d6da8c14cd96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "ballista/rust/core", "ballista/rust/executor", "ballista/rust/scheduler", + "ballista-examples", ] exclude = ["python"] diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml new file mode 100644 index 000000000000..85af1152ce9a --- /dev/null +++ b/ballista-examples/Cargo.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "ballista-examples" +description = "Ballista usage examples" +version = "0.5.0-SNAPSHOT" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "arrow", "distributed", "query", "sql" ] +edition = "2018" +publish = false + + +[dev-dependencies] +arrow-flight = { version = "5.0" } +datafusion = { path = "../datafusion" } +ballista = { path = "../ballista/rust/client" } +prost = "0.7" +tonic = "0.4" +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +futures = "0.3" +num_cpus = "1.13.0" diff --git a/ballista-examples/README.md b/ballista-examples/README.md new file mode 100644 index 000000000000..a049261cd0e0 --- /dev/null +++ b/ballista-examples/README.md @@ -0,0 +1,38 @@ +# Ballista Examples + +This directory contains examples for executing distributed queries with Ballista. + +For background information on the Ballista architecture, refer to +the [Ballista README](../ballista/README.md). + +## Start a standalone cluster + +From the root of the arrow-datafusion project, build release binaries. + +```bash +cargo build --release +``` + +Start a Ballista scheduler process in a new terminal session. + +```bash +RUST_LOG=info ./target/release/ballista-scheduler +``` + +Start one or more Ballista executor processes in new terminal sessions. When starting more than one +executor, a unique port number must be specified for each executor. + +```bash +RUST_LOG=info ./target/release/ballista-executor -c 4 +``` + +## Running the examples + +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` +environment variable so that the examples can find the test data files. + + +```bash +cargo run --release --example ballista-dataframe +``` + diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/examples/ballista-dataframe.rs new file mode 100644 index 000000000000..92b5433f081c --- /dev/null +++ b/ballista-examples/examples/ballista-dataframe.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista::prelude::*; +use datafusion::arrow::util::pretty; +use datafusion::prelude::{col, lit}; + +/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and +/// fetching results, using the DataFrame trait +#[tokio::main] +async fn main() -> Result<()> { + let config = BallistaConfig::builder() + .set("ballista.shuffle.partitions", "4") + .build()?; + let ctx = BallistaContext::remote("localhost", 50051, &config); + + let testdata = datafusion::arrow::util::test_util::parquet_test_data(); + + let filename = &format!("{}/alltypes_plain.parquet", testdata); + + // define the query using the DataFrame trait + let df = ctx + .read_parquet(filename)? + .select_columns(&["id", "bool_col", "timestamp_col"])? + .filter(col("id").gt(lit(1)))?; + + // execute the query - note that calling collect on the DataFrame + // trait will execute the query with DataFusion so we have to call + // collect on the BallistaContext instead and pass it the DataFusion + // logical plan + let mut stream = ctx.collect(&df.to_logical_plan()).await?; + + // print the results + let mut results = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch?; + results.push(batch); + } + pretty::print_batches(&results)?; + + Ok(()) +} diff --git a/ballista/README.md b/ballista/README.md index 038146a639ed..5fc5e53d153b 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -17,9 +17,9 @@ under the License. --> -# Ballista: Distributed Compute with Apache Arrow +# Ballista: Distributed Compute with Apache Arrow and DataFusion -Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built +Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs. @@ -35,6 +35,51 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes]( case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide redundancy in the case of a scheduler failing. +# Getting Started + +## Start a standalone cluster + +```bash +cargo build --release +``` + +```bash +RUST_LOG=info ./target/release/ballista-scheduler +``` + +```bash +RUST_LOG=info ./target/release/ballista-executor -c 4 +``` + +## Run the examples + +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` +environment variable so that the examples can find the test data files. + +```bash +cargo run --example ballista-dataframe +``` + +## Distributed Scheduler Overview + +Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. + +Specifically, any `RepartitionExec` operators are replaced with an `UnresolvedShuffleExec` and the child operator of the repartition opertor is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. + +Each executor polls the scheduler for the next task to run. Tasks are currently always ShuffleWriterExec operators and each task represents one *input* partition that will be executed. The resulting batches are repartitioned according to the shuffle partitioning scheme and each *output* partition is written to disk in IPC format. + +The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight interface, and streams the shuffle IPC files. + +## Performance and Scalability + +The following chart shows a performance comparison of DataFusion and Ballista executing queries from the TPC-H benchmark at scale factor 100. + +TODO + +DataFusion: Concurrency=24. + +Ballista: One scheduler and two executors, with each executor configured to run 12 concurrent tasks. + # How does this compare to Apache Spark? Although Ballista is largely inspired by Apache Spark, there are some key differences. @@ -48,15 +93,4 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute. - The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors - in any programming language with minimal serialization overhead. - -## Status - -Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in -April 2021 and should be considered experimental. - -## Getting Started - -The [Ballista Developer Documentation](docs/README.md) and the -[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the -best sources of information for getting started with Ballista. + in any programming language with minimal serialization overhead. \ No newline at end of file From 81a0d89d104e7f58fd98bb1da80884d160a93073 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:20:28 -0700 Subject: [PATCH 02/18] Add SQL example --- ballista-examples/README.md | 5 +- ballista-examples/examples/ballista-sql.rs | 63 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 ballista-examples/examples/ballista-sql.rs diff --git a/ballista-examples/README.md b/ballista-examples/README.md index a049261cd0e0..6148a9da945e 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -28,9 +28,10 @@ RUST_LOG=info ./target/release/ballista-executor -c 4 ## Running the examples -Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` -environment variable so that the examples can find the test data files. +Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and +`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files. +The examples can be run using the `cargo run --example` syntax. ```bash cargo run --release --example ballista-dataframe diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/examples/ballista-sql.rs new file mode 100644 index 000000000000..9d5b1cf81efc --- /dev/null +++ b/ballista-examples/examples/ballista-sql.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista::prelude::*; +use datafusion::arrow::util::pretty; +use datafusion::prelude::CsvReadOptions; + +/// This example demonstrates executing a simple query against an Arrow data source (CSV) and +/// fetching results, using SQL +#[tokio::main] +async fn main() -> Result<()> { + let config = BallistaConfig::builder() + .set("ballista.shuffle.partitions", "4") + .build()?; + let ctx = BallistaContext::remote("localhost", 50051, &config); + + let testdata = datafusion::arrow::util::test_util::arrow_test_data(); + + // register csv file with the execution context + ctx.register_csv( + "aggregate_test_100", + &format!("{}/csv/aggregate_test_100.csv", testdata), + CsvReadOptions::new(), + )?; + + // execute the query + let df = ctx.sql( + "SELECT c1, MIN(c12), MAX(c12) \ + FROM aggregate_test_100 \ + WHERE c11 > 0.1 AND c11 < 0.9 \ + GROUP BY c1", + )?; + + // execute the query - note that calling collect on the DataFrame + // trait will execute the query with DataFusion so we have to call + // collect on the BallistaContext instead and pass it the DataFusion + // logical plan + let mut stream = ctx.collect(&df.to_logical_plan()).await?; + + // print the results + let mut results = vec![]; + while let Some(batch) = stream.next().await { + let batch = batch?; + results.push(batch); + } + pretty::print_batches(&results)?; + + Ok(()) +} From 2fd3e832065ba4bc66609ee42fd7ca876844acf0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:26:47 -0700 Subject: [PATCH 03/18] improve docs --- ballista/README.md | 56 +++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/ballista/README.md b/ballista/README.md index 5fc5e53d153b..f670cfde99b8 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -19,9 +19,9 @@ # Ballista: Distributed Compute with Apache Arrow and DataFusion -Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built -on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as -first-class citizens without paying a penalty for serialization costs. +Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and +DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and +Java) to be supported as first-class citizens without paying a penalty for serialization costs. The foundational technologies in Ballista are: @@ -37,52 +37,28 @@ redundancy in the case of a scheduler failing. # Getting Started -## Start a standalone cluster - -```bash -cargo build --release -``` - -```bash -RUST_LOG=info ./target/release/ballista-scheduler -``` - -```bash -RUST_LOG=info ./target/release/ballista-executor -c 4 -``` - -## Run the examples - -Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `PARQUET_TEST_DATA` -environment variable so that the examples can find the test data files. - -```bash -cargo run --example ballista-dataframe -``` +Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for +more information. ## Distributed Scheduler Overview -Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. - -Specifically, any `RepartitionExec` operators are replaced with an `UnresolvedShuffleExec` and the child operator of the repartition opertor is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. - -Each executor polls the scheduler for the next task to run. Tasks are currently always ShuffleWriterExec operators and each task represents one *input* partition that will be executed. The resulting batches are repartitioned according to the shuffle partitioning scheme and each *output* partition is written to disk in IPC format. - -The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight interface, and streams the shuffle IPC files. - -## Performance and Scalability - -The following chart shows a performance comparison of DataFusion and Ballista executing queries from the TPC-H benchmark at scale factor 100. +Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a +distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. -TODO +Specifically, any `RepartitionExec` operatoris is replaced with an `UnresolvedShuffleExec` and the child operator +of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. -DataFusion: Concurrency=24. +Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators +and each task represents one *input* partition that will be executed. The resulting batches are repartitioned +according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format. -Ballista: One scheduler and two executors, with each executor configured to run 12 concurrent tasks. +The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle +tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight +interface, and streams the shuffle IPC files. # How does this compare to Apache Spark? -Although Ballista is largely inspired by Apache Spark, there are some key differences. +Ballista implements a similar design to Apache Spark, but there are some key differences. - The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of GC pauses. From 6b8d75134eeccb13b24d52f817b4cf6cb5895d82 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 10:31:42 -0700 Subject: [PATCH 04/18] fix port in exampls --- ballista-examples/examples/ballista-dataframe.rs | 2 +- ballista-examples/examples/ballista-sql.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/examples/ballista-dataframe.rs index 92b5433f081c..da7d99db1cf0 100644 --- a/ballista-examples/examples/ballista-dataframe.rs +++ b/ballista-examples/examples/ballista-dataframe.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("localhost", 50051, &config); + let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::arrow::util::test_util::parquet_test_data(); diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/examples/ballista-sql.rs index 9d5b1cf81efc..f9e7d180af45 100644 --- a/ballista-examples/examples/ballista-sql.rs +++ b/ballista-examples/examples/ballista-sql.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { let config = BallistaConfig::builder() .set("ballista.shuffle.partitions", "4") .build()?; - let ctx = BallistaContext::remote("localhost", 50051, &config); + let ctx = BallistaContext::remote("localhost", 50050, &config); let testdata = datafusion::arrow::util::test_util::arrow_test_data(); From 0365451cadae67ce2eb4c6902cc4f51866d231e4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Jul 2021 11:13:37 -0700 Subject: [PATCH 05/18] Add ASF header to ballista-exampls README --- ballista-examples/README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/ballista-examples/README.md b/ballista-examples/README.md index 6148a9da945e..29365bfe8a4f 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -1,3 +1,22 @@ + + # Ballista Examples This directory contains examples for executing distributed queries with Ballista. From c62b8dc52c1dc313e4e92d425dbed170090d3373 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 08:46:08 -0700 Subject: [PATCH 06/18] Update ballista/README.md Co-authored-by: Andrew Lamb --- ballista/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballista/README.md b/ballista/README.md index f670cfde99b8..0a8db63a1a6c 100644 --- a/ballista/README.md +++ b/ballista/README.md @@ -45,7 +45,7 @@ more information. Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes. -Specifically, any `RepartitionExec` operatoris is replaced with an `UnresolvedShuffleExec` and the child operator +Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution. Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators @@ -69,4 +69,4 @@ Ballista implements a similar design to Apache Spark, but there are some key dif Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute. - The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors - in any programming language with minimal serialization overhead. \ No newline at end of file + in any programming language with minimal serialization overhead. From bd9114bd916eb0e98450473d8fd2d59393999688 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 09:40:56 -0700 Subject: [PATCH 07/18] Refactor distributed query execution into new execution plan --- ballista/rust/client/src/context.rs | 134 +----------- .../src/execution_plans/distributed_query.rs | 206 ++++++++++++++++++ ballista/rust/core/src/execution_plans/mod.rs | 2 + 3 files changed, 217 insertions(+), 125 deletions(-) create mode 100644 ballista/rust/core/src/execution_plans/distributed_query.rs diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 26087f8e6693..1228f8ba06ba 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -17,32 +17,22 @@ //! Distributed execution context. +use std::collections::HashMap; +use std::fs; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::{collections::HashMap, convert::TryInto}; -use std::{fs, time::Duration}; use ballista_core::config::BallistaConfig; -use ballista_core::serde::protobuf::{ - execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient, - ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair, - PartitionLocation, -}; -use ballista_core::{ - client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context, - utils::WrappedStream, -}; +use ballista_core::{datasource::DfTableAdapter, utils::create_datafusion_context}; -use datafusion::arrow::datatypes::Schema; +use ballista_core::execution_plans::DistributedQueryExec; use datafusion::catalog::TableReference; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; +use datafusion::physical_plan::ExecutionPlan; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; -use futures::future; -use futures::StreamExt; -use log::{error, info}; struct BallistaContextState { /// Ballista configuration @@ -205,124 +195,18 @@ impl BallistaContext { ctx.sql(sql) } - async fn fetch_partition( - location: PartitionLocation, - ) -> Result>> { - let metadata = location.executor_meta.ok_or_else(|| { - DataFusionError::Internal("Received empty executor metadata".to_owned()) - })?; - let partition_id = location.partition_id.ok_or_else(|| { - DataFusionError::Internal("Received empty partition id".to_owned()) - })?; - let mut ballista_client = - BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - Ok(ballista_client - .fetch_partition( - &partition_id.job_id, - partition_id.stage_id as usize, - partition_id.partition_id as usize, - &location.path, - ) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) - } - pub async fn collect( &self, plan: &LogicalPlan, ) -> Result>> { - let (scheduler_url, config) = { + let distributed_query = { let state = self.state.lock().unwrap(); let scheduler_url = format!("http://{}:{}", state.scheduler_host, state.scheduler_port); - (scheduler_url, state.config.clone()) + DistributedQueryExec::new(scheduler_url, state.config.clone(), plan.clone()) }; - info!("Connecting to Ballista scheduler at {}", scheduler_url); - - let mut scheduler = SchedulerGrpcClient::connect(scheduler_url) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - - let schema: Schema = plan.schema().as_ref().clone().into(); - - let job_id = scheduler - .execute_query(ExecuteQueryParams { - query: Some(Query::LogicalPlan( - (plan) - .try_into() - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, - )), - settings: config - .settings() - .iter() - .map(|(k, v)| KeyValuePair { - key: k.to_owned(), - value: v.to_owned(), - }) - .collect::>(), - }) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? - .into_inner() - .job_id; - - let mut prev_status: Option = None; - - loop { - let GetJobStatusResult { status } = scheduler - .get_job_status(GetJobStatusParams { - job_id: job_id.clone(), - }) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? - .into_inner(); - let status = status.and_then(|s| s.status).ok_or_else(|| { - DataFusionError::Internal("Received empty status message".to_owned()) - })?; - let wait_future = tokio::time::sleep(Duration::from_millis(100)); - let has_status_change = prev_status.map(|x| x != status).unwrap_or(true); - match status { - job_status::Status::Queued(_) => { - if has_status_change { - info!("Job {} still queued...", job_id); - } - wait_future.await; - prev_status = Some(status); - } - job_status::Status::Running(_) => { - if has_status_change { - info!("Job {} is running...", job_id); - } - wait_future.await; - prev_status = Some(status); - } - job_status::Status::Failed(err) => { - let msg = format!("Job {} failed: {}", job_id, err.error); - error!("{}", msg); - break Err(DataFusionError::Execution(msg)); - } - job_status::Status::Completed(completed) => { - let result = future::join_all( - completed - .partition_location - .into_iter() - .map(BallistaContext::fetch_partition), - ) - .await - .into_iter() - .collect::>>()?; - - let result = WrappedStream::new( - Box::pin(futures::stream::iter(result).flatten()), - Arc::new(schema), - ); - break Ok(Box::pin(result)); - } - }; - } + distributed_query.execute(0).await } } diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs new file mode 100644 index 000000000000..de8602cb5a89 --- /dev/null +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::convert::TryInto; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use crate::client::BallistaClient; +use crate::config::BallistaConfig; +use crate::serde::protobuf::{ + execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient, + ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair, + PartitionLocation, +}; +use crate::utils::WrappedStream; + +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_plan::LogicalPlan; +use datafusion::physical_plan::{ + ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; + +use async_trait::async_trait; +use futures::future; +use futures::StreamExt; +use log::{error, info}; + +#[derive(Debug, Clone)] +pub struct DistributedQueryExec { + scheduler_url: String, + config: BallistaConfig, + plan: LogicalPlan, +} + +impl DistributedQueryExec { + pub fn new(scheduler_url: String, config: BallistaConfig, plan: LogicalPlan) -> Self { + Self { + scheduler_url, + config, + plan, + } + } +} + +#[async_trait] +impl ExecutionPlan for DistributedQueryExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.plan.schema().as_ref().clone().into() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> datafusion::error::Result> { + Ok(Arc::new(DistributedQueryExec::new( + self.scheduler_url.clone(), + self.config.clone(), + self.plan.clone(), + ))) + } + + async fn execute( + &self, + partition: usize, + ) -> datafusion::error::Result { + assert_eq!(0, partition); + + info!("Connecting to Ballista scheduler at {}", self.scheduler_url); + + let mut scheduler = SchedulerGrpcClient::connect(self.scheduler_url.clone()) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + let schema: Schema = self.plan.schema().as_ref().clone().into(); + + let job_id = scheduler + .execute_query(ExecuteQueryParams { + query: Some(Query::LogicalPlan( + (&self.plan) + .try_into() + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, + )), + settings: self + .config + .settings() + .iter() + .map(|(k, v)| KeyValuePair { + key: k.to_owned(), + value: v.to_owned(), + }) + .collect::>(), + }) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? + .into_inner() + .job_id; + + let mut prev_status: Option = None; + + loop { + let GetJobStatusResult { status } = scheduler + .get_job_status(GetJobStatusParams { + job_id: job_id.clone(), + }) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? + .into_inner(); + let status = status.and_then(|s| s.status).ok_or_else(|| { + DataFusionError::Internal("Received empty status message".to_owned()) + })?; + let wait_future = tokio::time::sleep(Duration::from_millis(100)); + let has_status_change = prev_status.map(|x| x != status).unwrap_or(true); + match status { + job_status::Status::Queued(_) => { + if has_status_change { + info!("Job {} still queued...", job_id); + } + wait_future.await; + prev_status = Some(status); + } + job_status::Status::Running(_) => { + if has_status_change { + info!("Job {} is running...", job_id); + } + wait_future.await; + prev_status = Some(status); + } + job_status::Status::Failed(err) => { + let msg = format!("Job {} failed: {}", job_id, err.error); + error!("{}", msg); + break Err(DataFusionError::Execution(msg)); + } + job_status::Status::Completed(completed) => { + let result = future::join_all( + completed + .partition_location + .into_iter() + .map(fetch_partition), + ) + .await + .into_iter() + .collect::>>()?; + + let result = WrappedStream::new( + Box::pin(futures::stream::iter(result).flatten()), + Arc::new(schema), + ); + break Ok(Box::pin(result)); + } + }; + } + } +} + +async fn fetch_partition( + location: PartitionLocation, +) -> Result { + let metadata = location.executor_meta.ok_or_else(|| { + DataFusionError::Internal("Received empty executor metadata".to_owned()) + })?; + let partition_id = location.partition_id.ok_or_else(|| { + DataFusionError::Internal("Received empty partition id".to_owned()) + })?; + let mut ballista_client = + BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + Ok(ballista_client + .fetch_partition( + &partition_id.job_id, + partition_id.stage_id as usize, + partition_id.partition_id as usize, + &location.path, + ) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) +} diff --git a/ballista/rust/core/src/execution_plans/mod.rs b/ballista/rust/core/src/execution_plans/mod.rs index ca4e60023ce8..b10ff341e903 100644 --- a/ballista/rust/core/src/execution_plans/mod.rs +++ b/ballista/rust/core/src/execution_plans/mod.rs @@ -18,10 +18,12 @@ //! This module contains execution plans that are needed to distribute Datafusion's execution plans into //! several Ballista executors. +mod distributed_query; mod shuffle_reader; mod shuffle_writer; mod unresolved_shuffle; +pub use distributed_query::DistributedQueryExec; pub use shuffle_reader::ShuffleReaderExec; pub use shuffle_writer::ShuffleWriterExec; pub use unresolved_shuffle::UnresolvedShuffleExec; From ae2fa6a35fa80ee820fb64f6ecd6acd97440d399 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 10:59:18 -0700 Subject: [PATCH 08/18] Refactor for integration between DataFusion and Ballista --- ballista/rust/client/src/context.rs | 20 +++++++++-- ballista/rust/core/src/utils.rs | 54 +++++++++++++++++++++++++---- ballista/rust/scheduler/src/lib.rs | 6 ++-- 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 1228f8ba06ba..5e09f685b1f0 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -132,7 +132,12 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); + let guard = self.state.lock().unwrap(); + let mut ctx = create_datafusion_context( + &guard.scheduler_host, + guard.scheduler_port, + &guard.config(), + ); let df = ctx.read_parquet(path.to_str().unwrap())?; Ok(df) } @@ -149,7 +154,12 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); + let guard = self.state.lock().unwrap(); + let mut ctx = create_datafusion_context( + &guard.scheduler_host, + guard.scheduler_port, + &guard.config(), + ); let df = ctx.read_csv(path.to_str().unwrap(), options)?; Ok(df) } @@ -183,7 +193,11 @@ impl BallistaContext { // use local DataFusion context for now but later this might call the scheduler // register tables let state = self.state.lock().unwrap(); - let mut ctx = create_datafusion_context(&state.config()); + let mut ctx = create_datafusion_context( + &state.scheduler_host, + state.scheduler_port, + state.config(), + ); for (name, plan) in &state.tables { let plan = ctx.optimize(plan)?; let execution_plan = ctx.create_physical_plan(&plan)?; diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 8b1cf61a55ee..4187faa6645a 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use std::{fs::File, pin::Pin}; use crate::error::{BallistaError, Result}; -use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec}; +use crate::execution_plans::{ + DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec, +}; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; @@ -38,8 +40,11 @@ use datafusion::arrow::{ ipc::writer::FileWriter, record_batch::RecordBatch, }; -use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; -use datafusion::logical_plan::Operator; +use datafusion::error::DataFusionError; +use datafusion::execution::context::{ + ExecutionConfig, ExecutionContext, ExecutionContextState, QueryPlanner, +}; +use datafusion::logical_plan::{LogicalPlan, Operator}; use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; @@ -232,12 +237,49 @@ fn build_exec_plan_diagram( } /// Create a DataFusion context that is compatible with Ballista -pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext { - let config = - ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions()); +pub fn create_datafusion_context( + scheduler_host: &str, + scheduler_port: u16, + config: &BallistaConfig, +) -> ExecutionContext { + let scheduler_url = format!("http://{}:{}", scheduler_host, scheduler_port); + let config = ExecutionConfig::new() + .with_query_planner(Arc::new(BallistaQueryPlanner::new( + scheduler_url, + config.clone(), + ))) + .with_concurrency(config.default_shuffle_partitions()); ExecutionContext::with_config(config) } +pub struct BallistaQueryPlanner { + scheduler_url: String, + config: BallistaConfig, +} + +impl BallistaQueryPlanner { + pub fn new(scheduler_url: String, config: BallistaConfig) -> Self { + Self { + scheduler_url, + config, + } + } +} + +impl QueryPlanner for BallistaQueryPlanner { + fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + _ctx_state: &ExecutionContextState, + ) -> std::result::Result, DataFusionError> { + Ok(Arc::new(DistributedQueryExec::new( + self.scheduler_url.clone(), + self.config.clone(), + logical_plan.clone(), + ))) + } +} + pub struct WrappedStream { stream: Pin> + Send + Sync>>, schema: SchemaRef, diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index f5e2dc1dfd80..9181e4cd3f8e 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -341,7 +341,8 @@ impl SchedulerGrpc for SchedulerServer { Query::Sql(sql) => { //TODO we can't just create a new context because we need a context that has // tables registered from previous SQL statements that have been executed - let mut ctx = create_datafusion_context(&config); + //TODO scheduler host and port + let mut ctx = create_datafusion_context("", 50050, &config); let df = ctx.sql(&sql).map_err(|e| { let msg = format!("Error parsing SQL: {}", e); error!("{}", msg); @@ -377,7 +378,8 @@ impl SchedulerGrpc for SchedulerServer { let job_id_spawn = job_id.clone(); tokio::spawn(async move { // create physical plan using DataFusion - let datafusion_ctx = create_datafusion_context(&config); + //TODO scheduler url + let datafusion_ctx = create_datafusion_context("", 50050, &config); macro_rules! fail_job { ($code :expr) => {{ match $code { From f66a881d99471179082ecb51ff5ffe684a639319 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 11:09:35 -0700 Subject: [PATCH 09/18] Change examples to binaries --- ballista-examples/Cargo.toml | 3 +-- ballista-examples/README.md | 4 ++-- ballista-examples/{examples => src/bin}/ballista-dataframe.rs | 0 ballista-examples/{examples => src/bin}/ballista-sql.rs | 0 4 files changed, 3 insertions(+), 4 deletions(-) rename ballista-examples/{examples => src/bin}/ballista-dataframe.rs (100%) rename ballista-examples/{examples => src/bin}/ballista-sql.rs (100%) diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index 85af1152ce9a..b7d40223c469 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -27,8 +27,7 @@ keywords = [ "arrow", "distributed", "query", "sql" ] edition = "2018" publish = false - -[dev-dependencies] +[dependencies] arrow-flight = { version = "5.0" } datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } diff --git a/ballista-examples/README.md b/ballista-examples/README.md index 29365bfe8a4f..1364ad47598b 100644 --- a/ballista-examples/README.md +++ b/ballista-examples/README.md @@ -50,9 +50,9 @@ RUST_LOG=info ./target/release/ballista-executor -c 4 Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and `PARQUET_TEST_DATA` environment variables so that the examples can find the test data files. -The examples can be run using the `cargo run --example` syntax. +The examples can be run using the `cargo run --bin` syntax. ```bash -cargo run --release --example ballista-dataframe +cargo run --release --bin ballista-dataframe ``` diff --git a/ballista-examples/examples/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs similarity index 100% rename from ballista-examples/examples/ballista-dataframe.rs rename to ballista-examples/src/bin/ballista-dataframe.rs diff --git a/ballista-examples/examples/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs similarity index 100% rename from ballista-examples/examples/ballista-sql.rs rename to ballista-examples/src/bin/ballista-sql.rs From 79de6d5da2c0add12ef6f7ae1d199eb849d1235f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 11:25:02 -0700 Subject: [PATCH 10/18] DataFrame.collect now works with Ballista DataFrames --- .../src/bin/ballista-dataframe.rs | 13 +------------ ballista-examples/src/bin/ballista-sql.rs | 13 +------------ ballista/rust/client/src/context.rs | 19 +------------------ ballista/rust/scheduler/src/lib.rs | 15 ++++++++++----- benchmarks/src/bin/tpch.rs | 11 ++--------- 5 files changed, 15 insertions(+), 56 deletions(-) diff --git a/ballista-examples/src/bin/ballista-dataframe.rs b/ballista-examples/src/bin/ballista-dataframe.rs index da7d99db1cf0..693e67682eda 100644 --- a/ballista-examples/src/bin/ballista-dataframe.rs +++ b/ballista-examples/src/bin/ballista-dataframe.rs @@ -38,18 +38,7 @@ async fn main() -> Result<()> { .select_columns(&["id", "bool_col", "timestamp_col"])? .filter(col("id").gt(lit(1)))?; - // execute the query - note that calling collect on the DataFrame - // trait will execute the query with DataFusion so we have to call - // collect on the BallistaContext instead and pass it the DataFusion - // logical plan - let mut stream = ctx.collect(&df.to_logical_plan()).await?; - - // print the results - let mut results = vec![]; - while let Some(batch) = stream.next().await { - let batch = batch?; - results.push(batch); - } + let results = df.collect().await?; pretty::print_batches(&results)?; Ok(()) diff --git a/ballista-examples/src/bin/ballista-sql.rs b/ballista-examples/src/bin/ballista-sql.rs index f9e7d180af45..590ab7bcf7a4 100644 --- a/ballista-examples/src/bin/ballista-sql.rs +++ b/ballista-examples/src/bin/ballista-sql.rs @@ -45,18 +45,7 @@ async fn main() -> Result<()> { GROUP BY c1", )?; - // execute the query - note that calling collect on the DataFrame - // trait will execute the query with DataFusion so we have to call - // collect on the BallistaContext instead and pass it the DataFusion - // logical plan - let mut stream = ctx.collect(&df.to_logical_plan()).await?; - - // print the results - let mut results = vec![]; - while let Some(batch) = stream.next().await { - let batch = batch?; - results.push(batch); - } + let results = df.collect().await?; pretty::print_batches(&results)?; Ok(()) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 5e09f685b1f0..16a90b2e2734 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -20,19 +20,16 @@ use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use std::pin::Pin; use std::sync::{Arc, Mutex}; use ballista_core::config::BallistaConfig; use ballista_core::{datasource::DfTableAdapter, utils::create_datafusion_context}; -use ballista_core::execution_plans::DistributedQueryExec; use datafusion::catalog::TableReference; +use datafusion::dataframe::DataFrame; use datafusion::error::Result; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; struct BallistaContextState { /// Ballista configuration @@ -208,20 +205,6 @@ impl BallistaContext { } ctx.sql(sql) } - - pub async fn collect( - &self, - plan: &LogicalPlan, - ) -> Result>> { - let distributed_query = { - let state = self.state.lock().unwrap(); - let scheduler_url = - format!("http://{}:{}", state.scheduler_host, state.scheduler_port); - DistributedQueryExec::new(scheduler_url, state.config.clone(), plan.clone()) - }; - - distributed_query.execute(0).await - } } #[cfg(test)] diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 9181e4cd3f8e..af79035d89fd 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,8 +82,8 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[derive(Clone)] @@ -341,8 +341,7 @@ impl SchedulerGrpc for SchedulerServer { Query::Sql(sql) => { //TODO we can't just create a new context because we need a context that has // tables registered from previous SQL statements that have been executed - //TODO scheduler host and port - let mut ctx = create_datafusion_context("", 50050, &config); + let mut ctx = create_datafusion_context2(&config); let df = ctx.sql(&sql).map_err(|e| { let msg = format!("Error parsing SQL: {}", e); error!("{}", msg); @@ -378,8 +377,7 @@ impl SchedulerGrpc for SchedulerServer { let job_id_spawn = job_id.clone(); tokio::spawn(async move { // create physical plan using DataFusion - //TODO scheduler url - let datafusion_ctx = create_datafusion_context("", 50050, &config); + let datafusion_ctx = create_datafusion_context2(&config); macro_rules! fail_job { ($code :expr) => {{ match $code { @@ -513,6 +511,13 @@ impl SchedulerGrpc for SchedulerServer { } } +/// Create a DataFusion context that is compatible with Ballista +pub fn create_datafusion_context2(config: &BallistaConfig) -> ExecutionContext { + let config = + ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions()); + ExecutionContext::with_config(config) +} + #[cfg(all(test, feature = "sled"))] mod test { use std::{ diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 169319d30bee..08b8864acd1b 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -25,8 +25,6 @@ use std::{ time::Instant, }; -use futures::StreamExt; - use ballista::context::BallistaContext; use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS}; @@ -312,15 +310,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { let df = ctx .sql(&sql) .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; - let mut batches = vec![]; - let mut stream = ctx - .collect(&df.to_logical_plan()) + let batches = df + .collect() .await .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?; - while let Some(result) = stream.next().await { - let batch = result?; - batches.push(batch); - } let elapsed = start.elapsed().as_secs_f64() * 1000.0; millis.push(elapsed as f64); println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed); From 619c9bf6b32181235e8a709bb6e5061d5627a9f8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 11:34:51 -0700 Subject: [PATCH 11/18] rename function --- ballista/rust/scheduler/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index af79035d89fd..3e4e73586d53 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -341,7 +341,7 @@ impl SchedulerGrpc for SchedulerServer { Query::Sql(sql) => { //TODO we can't just create a new context because we need a context that has // tables registered from previous SQL statements that have been executed - let mut ctx = create_datafusion_context2(&config); + let mut ctx = create_datafusion_context(&config); let df = ctx.sql(&sql).map_err(|e| { let msg = format!("Error parsing SQL: {}", e); error!("{}", msg); @@ -377,7 +377,7 @@ impl SchedulerGrpc for SchedulerServer { let job_id_spawn = job_id.clone(); tokio::spawn(async move { // create physical plan using DataFusion - let datafusion_ctx = create_datafusion_context2(&config); + let datafusion_ctx = create_datafusion_context(&config); macro_rules! fail_job { ($code :expr) => {{ match $code { @@ -512,7 +512,7 @@ impl SchedulerGrpc for SchedulerServer { } /// Create a DataFusion context that is compatible with Ballista -pub fn create_datafusion_context2(config: &BallistaConfig) -> ExecutionContext { +pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext { let config = ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions()); ExecutionContext::with_config(config) From 2300dcb20cf4211cf3f359d46beba4fe77565f2d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 11:38:01 -0700 Subject: [PATCH 12/18] Docs --- .../rust/core/src/execution_plans/distributed_query.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs index de8602cb5a89..8abfe6678893 100644 --- a/ballista/rust/core/src/execution_plans/distributed_query.rs +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -42,10 +42,17 @@ use futures::future; use futures::StreamExt; use log::{error, info}; +/// This operator sends a logial plan to a Ballista scheduler for execution and +/// polls the scheduler until the query is complete and then fetches the resulting +/// batches directly from the executors that hold the results from the final +/// query stage. #[derive(Debug, Clone)] pub struct DistributedQueryExec { + /// Ballista scheduler URL scheduler_url: String, + /// Ballista configuration config: BallistaConfig, + /// Logical plan to execute plan: LogicalPlan, } From b4a7324229b0d111487b2f71119bf3f7044e71d4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 27 Jul 2021 16:54:43 -0700 Subject: [PATCH 13/18] Rough out streaming versions of DataFrame.collect --- datafusion/src/dataframe.rs | 31 +++++++++++++++++++ datafusion/src/execution/dataframe_impl.rs | 32 ++++++++++++++++--- datafusion/src/physical_plan/mod.rs | 36 ++++++++++++++++------ 3 files changed, 85 insertions(+), 14 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 507a79861cd5..d9452bd09e94 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ }; use std::sync::Arc; +use crate::physical_plan::SendableRecordBatchStream; use async_trait::async_trait; /// DataFrame represents a logical set of rows with the same named columns. @@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync { /// ``` async fn collect(&self) -> Result>; + /// Executes this DataFrame and returns a stream over a single partition + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let stream = df.collect_stream().await?; + /// # Ok(()) + /// # } + /// ``` + async fn collect_stream(&self) -> Result; + /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch /// maintaining the input partitioning. /// @@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync { /// ``` async fn collect_partitioned(&self) -> Result>>; + /// Executes this DataFrame and returns one stream per partition. + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let batches = df.collect_stream_partitioned().await?; + /// # Ok(()) + /// # } + /// ``` + async fn collect_stream_partitioned(&self) -> Result>; + /// Returns the schema describing the output of this DataFrame in terms of columns returned, /// where each column has a name, data type, and nullability attribute. diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 4edd01c2c0a9..81cb57cc7bc2 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -31,6 +31,9 @@ use crate::{ physical_plan::{collect, collect_partitioned}, }; +use crate::physical_plan::{ + collect_stream, collect_stream_partitioned, SendableRecordBatchStream, +}; use async_trait::async_trait; /// Implementation of DataFrame API @@ -139,8 +142,8 @@ impl DataFrame for DataFrameImpl { self.plan.clone() } - // Convert the logical plan represented by this DataFrame into a physical plan and - // execute it + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, collecting all resulting batches into memory async fn collect(&self) -> Result> { let state = self.ctx_state.lock().unwrap().clone(); let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); @@ -149,8 +152,19 @@ impl DataFrame for DataFrameImpl { Ok(collect(plan).await?) } - // Convert the logical plan represented by this DataFrame into a physical plan and - // execute it + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, returning a stream over a single partition + async fn collect_stream(&self) -> Result { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + let plan = ctx.optimize(&self.plan)?; + let plan = ctx.create_physical_plan(&plan)?; + collect_stream(plan).await + } + + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, collecting all resulting batches into memory while maintaining + /// partitioning async fn collect_partitioned(&self) -> Result>> { let state = self.ctx_state.lock().unwrap().clone(); let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); @@ -159,6 +173,16 @@ impl DataFrame for DataFrameImpl { Ok(collect_partitioned(plan).await?) } + /// Convert the logical plan represented by this DataFrame into a physical plan and + /// execute it, returning a stream for each partition + async fn collect_stream_partitioned(&self) -> Result> { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + let plan = ctx.optimize(&self.plan)?; + let plan = ctx.create_physical_plan(&plan)?; + Ok(collect_stream_partitioned(plan).await?) + } + /// Returns the schema from the logical plan fn schema(&self) -> &DFSchema { self.plan.schema() diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index b3c0dd63e9ed..b35152f5629f 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -313,18 +313,25 @@ pub fn plan_metrics(plan: Arc) -> HashMap /// Execute the [ExecutionPlan] and collect the results in memory pub async fn collect(plan: Arc) -> Result> { + let stream = collect_stream(plan).await?; + common::collect(stream).await +} + +/// Execute the [ExecutionPlan] and collect the results in memory +pub async fn collect_stream( + plan: Arc, +) -> Result { match plan.output_partitioning().partition_count() { - 0 => Ok(vec![]), - 1 => { - let it = plan.execute(0).await?; - common::collect(it).await + 0 => { + todo!() } + 1 => plan.execute(0).await, _ => { // merge into a single partition let plan = CoalescePartitionsExec::new(plan.clone()); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); - common::collect(plan.execute(0).await?).await + plan.execute(0).await } } } @@ -333,16 +340,25 @@ pub async fn collect(plan: Arc) -> Result> { pub async fn collect_partitioned( plan: Arc, ) -> Result>> { + let streams = collect_stream_partitioned(plan).await?; + let mut result = vec![]; + for stream in streams { + result.push(common::collect(stream).await?); + } + Ok(result) +} + +/// Execute the [ExecutionPlan] and stream the results +pub async fn collect_stream_partitioned( + plan: Arc, +) -> Result> { match plan.output_partitioning().partition_count() { 0 => Ok(vec![]), - 1 => { - let it = plan.execute(0).await?; - Ok(vec![common::collect(it).await?]) - } + 1 => Ok(vec![plan.execute(0).await?]), _ => { let mut partitions = vec![]; for i in 0..plan.output_partitioning().partition_count() { - partitions.push(common::collect(plan.execute(i).await?).await?) + partitions.push(plan.execute(i).await?) } Ok(partitions) } From 340f434062ba6511767615f607cd1d3b9d27505d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jul 2021 07:58:08 -0700 Subject: [PATCH 14/18] Refactor code --- datafusion/src/execution/dataframe_impl.rs | 34 ++++------ datafusion/src/physical_plan/mod.rs | 79 ++++++++++++++-------- 2 files changed, 66 insertions(+), 47 deletions(-) diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index b58553b16a53..020e8f8f6086 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -32,7 +32,7 @@ use crate::{ }; use crate::physical_plan::{ - collect_stream, collect_stream_partitioned, SendableRecordBatchStream, + execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; use async_trait::async_trait; @@ -50,6 +50,14 @@ impl DataFrameImpl { plan: plan.clone(), } } + + /// Create a physical plan + async fn create_physical_plan(&self) -> Result> { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + let plan = ctx.optimize(&self.plan)?; + ctx.create_physical_plan(&plan) + } } #[async_trait] @@ -144,42 +152,30 @@ impl DataFrame for DataFrameImpl { /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, collecting all resulting batches into memory async fn collect(&self) -> Result> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = self.create_physical_plan().await?; Ok(collect(plan).await?) } /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, returning a stream over a single partition async fn collect_stream(&self) -> Result { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; - collect_stream(plan).await + let plan = self.create_physical_plan().await?; + execute_stream(plan).await } /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, collecting all resulting batches into memory while maintaining /// partitioning async fn collect_partitioned(&self) -> Result>> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = self.create_physical_plan().await?; Ok(collect_partitioned(plan).await?) } /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, returning a stream for each partition async fn collect_stream_partitioned(&self) -> Result> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - let plan = ctx.create_physical_plan(&plan)?; - Ok(collect_stream_partitioned(plan).await?) + let plan = self.create_physical_plan().await?; + Ok(execute_stream_partitioned(plan).await?) } /// Returns the schema from the logical plan diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index b35152f5629f..ce3e4f26d9f1 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,6 +17,14 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. +use std::fmt; +use std::fmt::{Debug, Display}; +use std::ops::Range; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, pin::Pin}; + use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; @@ -35,12 +43,6 @@ use async_trait::async_trait; pub use display::DisplayFormatType; use futures::stream::Stream; use hashbrown::HashMap; -use std::fmt; -use std::fmt::{Debug, Display}; -use std::ops::Range; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::{any::Any, pin::Pin}; /// Trait for types that stream [arrow::record_batch::RecordBatch] pub trait RecordBatchStream: Stream> { @@ -54,6 +56,34 @@ pub trait RecordBatchStream: Stream> { /// Trait for a stream of record batches. pub type SendableRecordBatchStream = Pin>; +/// EmptyRecordBatchStream can be used to create a RecordBatchStream +/// that will produce no results +pub struct EmptyRecordBatchStream {} + +impl EmptyRecordBatchStream { + /// Create an empty RecordBatchStream + pub fn new() -> Self { + Self {} + } +} + +impl RecordBatchStream for EmptyRecordBatchStream { + fn schema(&self) -> SchemaRef { + Arc::new(Schema::empty()) + } +} + +impl Stream for EmptyRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(None) + } +} + /// SQL metric type #[derive(Debug, Clone)] pub enum MetricType { @@ -313,18 +343,16 @@ pub fn plan_metrics(plan: Arc) -> HashMap /// Execute the [ExecutionPlan] and collect the results in memory pub async fn collect(plan: Arc) -> Result> { - let stream = collect_stream(plan).await?; + let stream = execute_stream(plan).await?; common::collect(stream).await } -/// Execute the [ExecutionPlan] and collect the results in memory -pub async fn collect_stream( +/// Execute the [ExecutionPlan] and return a single stream of results +pub async fn execute_stream( plan: Arc, ) -> Result { match plan.output_partitioning().partition_count() { - 0 => { - todo!() - } + 0 => Ok(Box::pin(EmptyRecordBatchStream::new())), 1 => plan.execute(0).await, _ => { // merge into a single partition @@ -340,29 +368,24 @@ pub async fn collect_stream( pub async fn collect_partitioned( plan: Arc, ) -> Result>> { - let streams = collect_stream_partitioned(plan).await?; - let mut result = vec![]; + let streams = execute_stream_partitioned(plan).await?; + let mut batches = Vec::with_capacity(streams.len()); for stream in streams { - result.push(common::collect(stream).await?); + batches.push(common::collect(stream).await?); } - Ok(result) + Ok(batches) } -/// Execute the [ExecutionPlan] and stream the results -pub async fn collect_stream_partitioned( +/// Execute the [ExecutionPlan] and return a vec with one stream per output partition +pub async fn execute_stream_partitioned( plan: Arc, ) -> Result> { - match plan.output_partitioning().partition_count() { - 0 => Ok(vec![]), - 1 => Ok(vec![plan.execute(0).await?]), - _ => { - let mut partitions = vec![]; - for i in 0..plan.output_partitioning().partition_count() { - partitions.push(plan.execute(i).await?) - } - Ok(partitions) - } + let p = plan.output_partitioning().partition_count(); + let mut streams = Vec::with_capacity(p); + for _ in 0..p { + streams.push(plan.execute(0).await?); } + Ok(streams) } /// Partitioning schemes supported by operators. From 63b1716cae316c0b3a4f9b8f0c4843564903c8a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jul 2021 08:01:38 -0700 Subject: [PATCH 15/18] Rename methods --- datafusion/src/dataframe.rs | 8 ++++---- datafusion/src/execution/dataframe_impl.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index d9452bd09e94..1d4cffdf89d4 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -232,11 +232,11 @@ pub trait DataFrame: Send + Sync { /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; - /// let stream = df.collect_stream().await?; + /// let stream = df.execute_stream().await?; /// # Ok(()) /// # } /// ``` - async fn collect_stream(&self) -> Result; + async fn execute_stream(&self) -> Result; /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch /// maintaining the input partitioning. @@ -263,11 +263,11 @@ pub trait DataFrame: Send + Sync { /// # async fn main() -> Result<()> { /// let mut ctx = ExecutionContext::new(); /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; - /// let batches = df.collect_stream_partitioned().await?; + /// let batches = df.execute_stream_partitioned().await?; /// # Ok(()) /// # } /// ``` - async fn collect_stream_partitioned(&self) -> Result>; + async fn execute_stream_partitioned(&self) -> Result>; /// Returns the schema describing the output of this DataFrame in terms of columns returned, /// where each column has a name, data type, and nullability attribute. diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 020e8f8f6086..1c0094b711d6 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -158,7 +158,7 @@ impl DataFrame for DataFrameImpl { /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, returning a stream over a single partition - async fn collect_stream(&self) -> Result { + async fn execute_stream(&self) -> Result { let plan = self.create_physical_plan().await?; execute_stream(plan).await } @@ -173,7 +173,7 @@ impl DataFrame for DataFrameImpl { /// Convert the logical plan represented by this DataFrame into a physical plan and /// execute it, returning a stream for each partition - async fn collect_stream_partitioned(&self) -> Result> { + async fn execute_stream_partitioned(&self) -> Result> { let plan = self.create_physical_plan().await?; Ok(execute_stream_partitioned(plan).await?) } From ccd647e2663191c337b565cee9894d5af54bcee1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jul 2021 08:04:55 -0700 Subject: [PATCH 16/18] Fix regression --- datafusion/src/physical_plan/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index ce3e4f26d9f1..b7c15336e744 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -380,10 +380,10 @@ pub async fn collect_partitioned( pub async fn execute_stream_partitioned( plan: Arc, ) -> Result> { - let p = plan.output_partitioning().partition_count(); - let mut streams = Vec::with_capacity(p); - for _ in 0..p { - streams.push(plan.execute(0).await?); + let num_partitions = plan.output_partitioning().partition_count(); + let mut streams = Vec::with_capacity(num_partitions); + for i in 0..num_partitions { + streams.push(plan.execute(i).await?); } Ok(streams) } From b8f253a9831cc42b366442a64188dced79d55133 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jul 2021 08:07:22 -0700 Subject: [PATCH 17/18] Specify schema in EmptyRecordBatchStream --- datafusion/src/physical_plan/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index b7c15336e744..f347d15d089d 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -58,18 +58,21 @@ pub type SendableRecordBatchStream = Pin Self { - Self {} + pub fn new(schema: SchemaRef) -> Self { + Self { schema } } } impl RecordBatchStream for EmptyRecordBatchStream { fn schema(&self) -> SchemaRef { - Arc::new(Schema::empty()) + self.schema.clone() } } @@ -352,7 +355,7 @@ pub async fn execute_stream( plan: Arc, ) -> Result { match plan.output_partitioning().partition_count() { - 0 => Ok(Box::pin(EmptyRecordBatchStream::new())), + 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), 1 => plan.execute(0).await, _ => { // merge into a single partition From 1b3bd82a546300d5c484f649158394fa7749652f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Jul 2021 09:32:15 -0700 Subject: [PATCH 18/18] format --- datafusion/src/physical_plan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index f347d15d089d..86bceb11b475 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -60,7 +60,7 @@ pub type SendableRecordBatchStream = Pin