Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ballista examples #775

Merged
merged 8 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
"ballista-examples",
]

exclude = ["python"]
39 changes: 39 additions & 0 deletions ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "ballista-examples"
description = "Ballista usage examples"
version = "0.5.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
keywords = [ "arrow", "distributed", "query", "sql" ]
edition = "2018"
publish = false


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth adding bin targets here?

As it is I can't run these examples:

(arrow_dev) alamb@MacBook-Pro:~/Software/arrow-datafusion/ballista-examples$ cargo run 
error: a bin target must be available for `cargo run`

Maybe something like

[[bin]]
name = "dataframe"
path = "src/ballista_dataframe.rs"

Copy link
Member Author

@andygrove andygrove Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the same pattern that we use in datafusion-examples where we use cargo run --example rather than cargo run --bin.

% cargo run --example
error: "--example" takes one argument.
Available examples:
    ballista-dataframe
    ballista-sql

It is a little odd that we package the examples in their own crate, so maybe packaging them as binaries makes more sense now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main motivation from extracting them into a separate folder/crate for datafusion-examples was to reduce the nr of dependencies and compilation time.
Maybe bin works just as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this to use --bin now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

[dev-dependencies]
arrow-flight = { version = "5.0" }
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
prost = "0.7"
tonic = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
58 changes: 58 additions & 0 deletions ballista-examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Ballista Examples

This directory contains examples for executing distributed queries with Ballista.

For background information on the Ballista architecture, refer to
the [Ballista README](../ballista/README.md).

## Start a standalone cluster

From the root of the arrow-datafusion project, build release binaries.

```bash
cargo build --release
```

Start a Ballista scheduler process in a new terminal session.

```bash
RUST_LOG=info ./target/release/ballista-scheduler
```

Start one or more Ballista executor processes in new terminal sessions. When starting more than one
executor, a unique port number must be specified for each executor.

```bash
RUST_LOG=info ./target/release/ballista-executor -c 4
```

## Running the examples

Refer to the instructions in [DEVELOPERS.md](../DEVELOPERS.md) to define the `ARROW_TEST_DATA` and
`PARQUET_TEST_DATA` environment variables so that the examples can find the test data files.

The examples can be run using the `cargo run --example` syntax.

```bash
cargo run --release --example ballista-dataframe
```

56 changes: 56 additions & 0 deletions ballista-examples/examples/ballista-dataframe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::{col, lit};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename)?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

// execute the query - note that calling collect on the DataFrame
// trait will execute the query with DataFusion so we have to call
// collect on the BallistaContext instead and pass it the DataFusion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// logical plan
let mut stream = ctx.collect(&df.to_logical_plan()).await?;

// print the results
let mut results = vec![];
while let Some(batch) = stream.next().await {
let batch = batch?;
results.push(batch);
}
pretty::print_batches(&results)?;

Ok(())
}
63 changes: 63 additions & 0 deletions ballista-examples/examples/ballista-sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
CsvReadOptions::new(),
)?;

// execute the query
let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;

// execute the query - note that calling collect on the DataFrame
// trait will execute the query with DataFusion so we have to call
// collect on the BallistaContext instead and pass it the DataFusion
// logical plan
let mut stream = ctx.collect(&df.to_logical_plan()).await?;

// print the results
let mut results = vec![];
while let Some(batch) = stream.next().await {
let batch = batch?;
results.push(batch);
}
pretty::print_batches(&results)?;

Ok(())
}
42 changes: 26 additions & 16 deletions ballista/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
under the License.
-->

# Ballista: Distributed Compute with Apache Arrow
# Ballista: Distributed Compute with Apache Arrow and DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow. It is built
on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as
first-class citizens without paying a penalty for serialization costs.
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and
DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and
Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:

Expand All @@ -35,9 +35,30 @@ Ballista can be deployed as a standalone cluster and also supports [Kubernetes](
case, the scheduler can be configured to use [etcd](https://etcd.io/) as a backing store to (eventually) provide
redundancy in the case of a scheduler failing.

# Getting Started

Fully working examples are available. Refer to the [Ballista Examples README](../ballista-examples/README.md) for
more information.

## Distributed Scheduler Overview

Ballista uses the DataFusion query execution framework to create a physical plan and then transforms it into a
distributed physical plan by breaking the query down into stages whenever the partitioning scheme changes.

Specifically, any `RepartitionExec` operator is replaced with an `UnresolvedShuffleExec` and the child operator
of the repartition operator is wrapped in a `ShuffleWriterExec` operator and scheduled for execution.

Each executor polls the scheduler for the next task to run. Tasks are currently always `ShuffleWriterExec` operators
and each task represents one *input* partition that will be executed. The resulting batches are repartitioned
according to the shuffle partitioning scheme and each *output* partition is streamed to disk in Arrow IPC format.

The scheduler will replace `UnresolvedShuffleExec` operators with `ShuffleReaderExec` operators once all shuffle
tasks have completed. The `ShuffleReaderExec` operator connects to other executors as required using the Flight
interface, and streams the shuffle IPC files.

# How does this compare to Apache Spark?

Although Ballista is largely inspired by Apache Spark, there are some key differences.
Ballista implements a similar design to Apache Spark, but there are some key differences.

- The choice of Rust as the main execution language means that memory usage is deterministic and avoids the overhead of
GC pauses.
Expand All @@ -49,14 +70,3 @@ Although Ballista is largely inspired by Apache Spark, there are some key differ
distributed compute.
- The use of Apache Arrow as the memory model and network protocol means that data can be exchanged between executors
in any programming language with minimal serialization overhead.

## Status

Ballista was [donated](https://arrow.apache.org/blog/2021/04/12/ballista-donation/) to the Apache Arrow project in
April 2021 and should be considered experimental.

## Getting Started

The [Ballista Developer Documentation](docs/README.md) and the
[DataFusion User Guide](https://github.com/apache/arrow-datafusion/tree/master/docs/user-guide) are currently the
best sources of information for getting started with Ballista.