Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade datafusion to 20.0.0 & sqlparser to to 0.32.0 #711

Merged
merged 3 commits into from
Mar 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,50 @@ jobs:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"

# verify that the benchmark queries return the correct results
verify-benchmark-results:
name: verify benchmark results (amd64)
needs: [linux-build-lib]
runs-on: ubuntu-latest
strategy:
matrix:
arch: [amd64]
rust: [stable]
container:
image: ${{ matrix.arch }}/rust
env:
# Disable full debug symbol generation to speed up CI build and keep memory down
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
apt-get -qq update && apt-get -y -qq install protobuf-compiler
protoc --version
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Cache Rust dependencies
uses: actions/cache@v3
with:
path: /github/home/target
# this key equals the ones on `linux-build-lib` for re-use
key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }}
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ matrix.rust }}
- name: Verify that benchmark queries return expected results
run: |
cargo test --package ballista-benchmarks --profile release-nonlto --features=ci -- --test-threads=1

lint:
name: Lint
runs-on: ubuntu-latest
Expand Down Expand Up @@ -313,7 +357,7 @@ jobs:
if [[ $DOCKER_TAG =~ ^[0-9\.]+-rc[0-9]+$ ]]
then
echo "publishing docker tag $DOCKER_TAG"
docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
fi
Expand Down
19 changes: 19 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,28 @@ members = [
]
exclude = ["python"]

[workspace.dependencies]
arrow = { version = "34.0.0" }
arrow-flight = { version = "34.0.0", features = ["flight-sql-experimental"] }
datafusion = "20.0.0"
datafusion-proto = "20.0.0"

# cargo build --profile release-lto
[profile.release-lto]
codegen-units = 1
inherits = "release"
lto = true

# the release profile takes a long time to build so we can use this profile during development to save time
# cargo build --profile release-nonlto
[profile.release-nonlto]
codegen-units = 16
debug = false
debug-assertions = false
incremental = false
inherits = "release"
lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.11.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "19.0.0"
datafusion-cli = "19.0.0"
datafusion = { workspace = true }
datafusion-cli = "20.0.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.11.0" }
ballista-executor = { path = "../executor", version = "0.11.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.11.0", optional = true }
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = "0.30.0"
sqlparser = "0.32.0"
tempfile = "3"
tokio = "1.0"

Expand Down
2 changes: 1 addition & 1 deletion ballista/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ To build a simple ballista example, add the following dependencies to your `Carg
```toml
[dependencies]
ballista = "0.10"
datafusion = "19.0.0"
datafusion = "20.0.0"
tokio = "1.0"
```

Expand Down
9 changes: 4 additions & 5 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ simd = ["datafusion/simd"]

[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "33.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { workspace = true }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "19.0.0"
datafusion = { workspace = true }
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
datafusion-proto = "19.0.0"
datafusion-proto = { workspace = true }
futures = "0.3"
hashbrown = "0.13"

Expand All @@ -68,7 +67,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = "0.30.0"
sqlparser = "0.32.0"
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
8 changes: 4 additions & 4 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ default = ["mimalloc"]

[dependencies]
anyhow = "1"
arrow = { version = "33.0.0" }
arrow-flight = { version = "33.0.0" }
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
Expand Down
6 changes: 4 additions & 2 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::config::Extensions;
use datafusion::physical_plan::ExecutionPlan;

use ballista_core::serde::protobuf::{
Expand Down Expand Up @@ -184,14 +185,15 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
}
let runtime = executor.runtime.clone();
let session_id = task.session_id.clone();
let task_context = Arc::new(TaskContext::new(
let task_context = Arc::new(TaskContext::try_new(
task_identity.clone(),
session_id,
task_props,
task_scalar_functions,
task_aggregate_functions,
runtime.clone(),
));
Extensions::default(),
)?);

let plan: Arc<dyn ExecutionPlan> =
U::try_decode(task.plan.as_slice()).and_then(|proto| {
Expand Down
6 changes: 4 additions & 2 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use ballista_core::BALLISTA_VERSION;
use datafusion::config::Extensions;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
Expand Down Expand Up @@ -319,14 +320,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,

let session_id = task.session_id;
let runtime = self.executor.runtime.clone();
let task_context = Arc::new(TaskContext::new(
let task_context = Arc::new(TaskContext::try_new(
task_identity.clone(),
session_id,
task_props,
task_scalar_functions,
task_aggregate_functions,
runtime.clone(),
));
Extensions::default(),
)?);

let encoded_plan = &task.plan.as_slice();

Expand Down
6 changes: 3 additions & 3 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ sled = ["sled_package", "tokio-stream"]

[dependencies]
anyhow = "1"
arrow-flight = { version = "33.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { workspace = true }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "19.0.0"
datafusion-proto = "19.0.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
Expand Down
43 changes: 24 additions & 19 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,32 +462,34 @@ order by
/* Expected result:

ShuffleWriterExec: Some(Hash([Column { name: "l_orderkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=4096
FilterExec: l_shipmode@4 IN ([Literal { value: Utf8("MAIL") }, Literal { value: Utf8("SHIP") }]) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (l_shipmode@4 = SHIP OR l_shipmode@4 = MAIL) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
CsvExec: files={2 groups: [[testdata/lineitem/partition0.tbl], [testdata/lineitem/partition1.tbl]]}, has_header=false, limit=None, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode]

ShuffleWriterExec: Some(Hash([Column { name: "o_orderkey", index: 0 }], 2))
CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false
CsvExec: files={1 group: [[testdata/orders/orders.tbl]]}, has_header=false, limit=None, projection=[o_orderkey, o_orderpriority]

ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2))
AggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
ProjectionExec: expr=[l_shipmode@1 as l_shipmode, o_orderpriority@3 as o_orderpriority]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec

ShuffleWriterExec: None
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
SortExec: expr=[l_shipmode@0 ASC NULLS LAST]
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec

ShuffleWriterExec: None
SortExec: [l_shipmode@0 ASC]
CoalescePartitionsExec
UnresolvedShuffleExec
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
UnresolvedShuffleExec
*/

assert_eq!(5, stages.len());
Expand Down Expand Up @@ -537,7 +539,10 @@ order by

let hash_agg = downcast_exec!(input, AggregateExec);

let coalesce_batches = hash_agg.children()[0].clone();
let projection = hash_agg.children()[0].clone();
let projection = downcast_exec!(projection, ProjectionExec);

let coalesce_batches = projection.children()[0].clone();
let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec);

let join = coalesce_batches.children()[0].clone();
Expand Down
Loading