Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into columnar_shuffle_de…
Browse files Browse the repository at this point in the history
…fault
  • Loading branch information
viirya committed May 3, 2024
2 parents 1e560d3 + 2741ae7 commit 126675e
Show file tree
Hide file tree
Showing 21 changed files with 1,714 additions and 192 deletions.
81 changes: 81 additions & 0 deletions .github/workflows/spark_sql_test_ansi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Spark SQL Tests (ANSI mode)

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
# enable the following once Ansi support is completed
# push:
# paths-ignore:
# - "doc/**"
# - "**.md"
# pull_request:
# paths-ignore:
# - "doc/**"
# - "**.md"

# manual trigger ONLY
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
workflow_dispatch:

env:
RUST_VERSION: nightly

jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.2'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
- {name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"}
- {name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"}
- {name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
- {name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"}
- {name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
fail-fast: false
name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.java-version }}
runs-on: ${{ matrix.os }}
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java-version }}
- name: Setup Spark
uses: ./.github/actions/setup-spark-builder
with:
spark-version: ${{ matrix.spark-version.full }}
spark-short-version: ${{ matrix.spark-version.short }}
comet-version: '0.1.0-SNAPSHOT' # TODO: get this from pom.xml
- name: Run Spark tests
run: |
cd apache-spark
ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Linux, Apple OSX (Intel and M1)
## Requirements

- Apache Spark 3.2, 3.3, or 3.4
- JDK 8 and up
- JDK 8, 11 and 17 (JDK 11 recommended because Spark 3.2 doesn't support 17)
- GLIBC 2.17 (Centos 7) and up

## Getting started
Expand Down
49 changes: 8 additions & 41 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.comet

import java.io.{BufferedOutputStream, FileOutputStream}
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ListBuffer
import scala.io.Source

import org.apache.spark.network.util.ByteUnit
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -376,12 +374,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
"spark.comet.cast.stringToTimestamp")
.doc(
"Comet is not currently fully compatible with Spark when casting from String to Timestamp.")
.booleanConf
.createWithDefault(false)
val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all cast operations. " +
"Set this config to true to allow them anyway. See compatibility guide " +
"for more information.")
.booleanConf
.createWithDefault(false)

}

Expand Down Expand Up @@ -625,36 +625,3 @@ private[comet] case class ConfigBuilder(key: String) {
private object ConfigEntry {
val UNDEFINED = "<undefined>"
}

/**
* Utility for generating markdown documentation from the configs.
*
* This is invoked when running `mvn clean package -DskipTests`.
*/
object CometConfGenerateDocs {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
// scalastyle:off println
println("Missing arguments for template file and output file")
// scalastyle:on println
sys.exit(-1)
}
val templateFilename = args.head
val outputFilename = args(1)
val w = new BufferedOutputStream(new FileOutputStream(outputFilename))
for (line <- Source.fromFile(templateFilename).getLines()) {
if (line.trim == "<!--CONFIG_TABLE-->") {
val publicConfigs = CometConf.allConfs.filter(_.isPublic)
val confs = publicConfigs.sortBy(_.key)
w.write("| Config | Description | Default Value |\n".getBytes)
w.write("|--------|-------------|---------------|\n".getBytes)
for (conf <- confs) {
w.write(s"| ${conf.key} | ${conf.doc.trim} | ${conf.defaultValueString} |\n".getBytes)
}
} else {
w.write(s"${line.trim}\n".getBytes)
}
}
w.close()
}
}
3 changes: 3 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@ harness = false
name = "row_columnar"
harness = false

[[bench]]
name = "cast"
harness = false
85 changes: 85 additions & 0 deletions core/benches/cast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::datafusion::expressions::cast::{Cast, EvalMode};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..1000 {
if i % 10 == 0 {
b.append_null();
} else if i % 2 == 0 {
b.append_value(format!("{}", rand::random::<f64>()));
} else {
b.append_value(format!("{}", rand::random::<i64>()));
}
}
let array = b.finish();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
let expr = Arc::new(Column::new("a", 0));
let timezone = "".to_string();
let cast_string_to_i8 = Cast::new(
expr.clone(),
DataType::Int8,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i16 = Cast::new(
expr.clone(),
DataType::Int16,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i32 = Cast::new(
expr.clone(),
DataType::Int32,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i64 = Cast::new(expr, DataType::Int64, EvalMode::Legacy, timezone);

let mut group = c.benchmark_group("cast");
group.bench_function("cast_string_to_i8", |b| {
b.iter(|| cast_string_to_i8.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i16", |b| {
b.iter(|| cast_string_to_i16.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i32", |b| {
b.iter(|| cast_string_to_i32.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i64", |b| {
b.iter(|| cast_string_to_i64.evaluate(&batch).unwrap());
});
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
Loading

0 comments on commit 126675e

Please sign in to comment.