Skip to content

Commit

Permalink
[Gluten-147] Made the needed changes to update Velox (#144)
Browse files Browse the repository at this point in the history
* refine agg plan generation

* support cast node

* fix

* refine docs
  • Loading branch information
rui-mo authored May 9, 2022
1 parent 4b0e330 commit de7aaa9
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ set(VELOX_SRCS
)
add_library(velox SHARED ${VELOX_SRCS})

target_include_directories(velox PUBLIC ${CMAKE_SYSTEM_INCLUDE_PATH} ${JNI_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR} ${root_directory}/src ${VELOX_HOME} ${VELOX_HOME}/velox/vector ${VELOX_REALEASE_PATH})
target_include_directories(velox PUBLIC ${CMAKE_SYSTEM_INCLUDE_PATH} ${JNI_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR} ${root_directory}/src ${VELOX_HOME} ${VELOX_HOME}/velox/vector ${VELOX_REALEASE_PATH} ${VELOX_HOME}/third_party/xsimd/include/)

set_target_properties(velox PROPERTIES
LIBRARY_OUTPUT_DIRECTORY ${root_directory}/releases
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void VeloxPlanConverter::setInputPlanNode(const ::substrait::FilterRel& sfilter)
}

void VeloxPlanConverter::setInputPlanNode(const ::substrait::ReadRel& sread) {
int32_t iterIdx = subVeloxPlanConverter_->iterAsInput(sread);
int32_t iterIdx = subVeloxPlanConverter_->streamIsInput(sread);
if (iterIdx == -1) {
return;
}
Expand Down
9 changes: 4 additions & 5 deletions docs/GlutenUsage.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ Based on the different environment, there are some parameters can be set via -D
| build_arrow | Build Arrow from Source | ON |
| arrow_root | When build_arrow set to False, arrow_root will be enabled to find the location of your existing arrow library. | /usr/local |
| build_protobuf | Build Protobuf from Source. If set to False, default library path will be used to find protobuf library. |ON |
| build_velox | Enable or Disable building Velox as a backend. | OFF |
| build_velox_from_source | Enable or Disable building Velox from a specific velox github repository. A default installed path will be in velox_home | OFF |
| velox_home (only valid when build_velox is ON) | When building Gluten with Velox, if you have an existing Velox, please set it. | /PATH_TO_GLUTEN/tools/build_velox/velox_ep |
| backends-velox | Add -Pbackends-velox in maven command to compile the JVM part of Velox backend| false |
| backends-clickhouse | Add -Pbackends-clickhouse in maven command to compile the JVM part of ClickHouse backend | false |
| build_velox | Enable or Disable building the CPP part of Velox backend | OFF |
| velox_home (only valid when build_velox is ON) | The path to the compiled Velox project. When building Gluten with Velox, if you have an existing Velox, please set it. | /PATH_TO_GLUTEN/tools/build_velox/velox_ep |

When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-8.0.0-gluten)
If you wish to change any parameters from Arrow, you can change it from the [build_arrow.sh](../tools/build_arrow.sh) script.
Expand Down Expand Up @@ -81,6 +83,3 @@ Submit the above script from spark-shell to trigger a Spark Job with certain con
```shell script
cat query.scala | spark-shell --name query --master yarn --deploy-mode client --conf spark.plugins=io.glutenproject.GlutenPlugin --conf spark.gluten.sql.columnar.backend.lib=${BACKEND} --conf spark.driver.extraClassPath=${gluten_jvm_jar} --conf spark.executor.extraClassPath=${gluten_jvm_jar} --conf spark.memory.offHeap.size=20g --conf spark.sql.sources.useV1SourceList=avro --num-executors 6 --executor-cores 6 --driver-memory 20g --executor-memory 25g --conf spark.executor.memoryOverhead=5g --conf spark.driver.maxResultSize=32g
```

Please note that there is a WIP [pull request](https://github.com/oap-project/gluten/pull/124) to add the fallback mechanism.
Unsupported operators are expected to be executed in vanilla Spark.
39 changes: 21 additions & 18 deletions docs/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
Currently, Gluten requires Velox being pre-compiled.
In general, please refer to [Velox Installation](https://github.com/facebookincubator/velox/blob/main/scripts/setup-ubuntu.sh) to install all the dependencies and compile Velox.

Gluten depends on this [Velox branch](https://github.com/rui-mo/velox/tree/velox_for_gazelle_jni).
The changes to Velox are planned to be upstreamed in the future.
Gluten depends on this [Velox branch](https://github.com/oap-project/velox/commits/main) under oap-project.
The changes to Velox are planned to be upstreamed in the future. Some of them have already been raised to Velox in pull requests.

Gluten depends on this [Arrow branch](https://github.com/oap-project/arrow/tree/arrow-8.0.0-gluten) with this [pull request](https://github.com/oap-project/arrow/pull/94).
Gluten depends on this [Arrow branch](https://github.com/oap-project/arrow/tree/arrow-8.0.0-gluten).
In the future, Gluten with Velox backend will swift to use the upstream Arrow.

In addition to above notes, there are several points worth attention when compiling Gluten with Velox.
Expand Down Expand Up @@ -53,11 +53,9 @@ mvn clean package -Pbackends-velox -P full-scala-compiler -DskipTests -Dchecksty

### An example for offloading Spark's computing to Velox with Gluten

TPC-H Q1 and Q6 are supported in Gluten using Velox as backend. Current support has below limitations:

- Found Date and Long types in Velox's TableScan are not fully ready,
so converted related columns into Double type.
- Metrics are missing.
In Gluten, TPC-H Q1 and Q6 can be fully offloaded into Velox for computing. Part of the operators
in other TPC-H queires can be offloaded into Velox. The unsupported operators will fallback
into Spark for execution.

#### Test TPC-H Q1 and Q6 on Gluten with Velox backend

Expand All @@ -76,16 +74,21 @@ which is incompatible with Velox's String column. Add below option to disable th
--conf spark.sql.hive.convertMetastoreOrc=false
```

Considering Velox's support for Decimal, Date, Long types are not fully ready, the related columns of TPC-H Q6 were all transformed into Double type.
Below script shows how to convert Parquet into ORC format, and transforming TPC-H Q6 related columns into Double type.
To align with this data type change, the TPC-H Q6 query was changed accordingly.
Considering Velox's support for Decimal and Date are not fully ready,
and there are also some compatibility issues for Bigint and Int in Velox's TableScan
when reading Spark generated ORC files (see [issue](https://github.com/facebookincubator/velox/issues/1436)),
the related columns in TPC-H Q1 and Q6 were all transformed into Double type.

Below script shows how to convert Parquet into ORC format, and transforming all the columns into Double type.
To align with this data type change, the TPC-H Q1 and Q6 queries need to be changed accordingly.

```shell script
val secondsInADay = 86400.0
for (filePath <- fileLists) {
val parquet = spark.read.parquet(filePath)
val df = parquet.select(parquet.col("l_orderkey"), parquet.col("l_partkey"), parquet.col("l_suppkey"), parquet.col("l_linenumber"), parquet.col("l_quantity"), parquet.col("l_extendedprice"), parquet.col("l_discount"), parquet.col("l_tax"), parquet.col("l_returnflag"), parquet.col("l_linestatus"), parquet.col("l_shipdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(seconds_in_a_day).alias("l_shipdate_new"), parquet.col("l_commitdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(seconds_in_a_day).alias("l_commitdate_new"), parquet.col("l_receiptdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(seconds_in_a_day).alias("l_receiptdate_new"), parquet.col("l_shipinstruct"), parquet.col("l_shipmode"), parquet.col("l_comment"))
val df = parquet.select(parquet.col("l_orderkey").cast(DoubleType), parquet.col("l_partkey").cast(DoubleType), parquet.col("l_suppkey").cast(DoubleType), parquet.col("l_linenumber").cast(DoubleType), parquet.col("l_quantity"), parquet.col("l_extendedprice"), parquet.col("l_discount"), parquet.col("l_tax"), parquet.col("l_returnflag"), parquet.col("l_linestatus"), parquet.col("l_shipdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(secondsInADay).alias("l_shipdate"), parquet.col("l_commitdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(secondsInADay).alias("l_commitdate"), parquet.col("l_receiptdate").cast(TimestampType).cast(LongType).cast(DoubleType).divide(secondsInADay).alias("l_receiptdate"), parquet.col("l_shipinstruct"), parquet.col("l_shipmode"), parquet.col("l_comment"))
val part_df = df.repartition(1)
part_df.write.mode("append").format("orc").save(ORC_path)
part_df.write.mode("append").format("orc").save(orcPath)
}
```

Expand All @@ -94,12 +97,12 @@ for (filePath <- fileLists) {
The modified TPC-H Q6 query is:

```shell script
select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate_new >= 8766 and l_shipdate_new < 9131 and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24
select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= 8766 and l_shipdate < 9131 and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24
```
The modified TPC-H Q6 query is:
The modified TPC-H Q1 query is:
```shell script
select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate_new <= 10471 group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus
select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate <= 10471 group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus
```
Below script shows how to read the ORC data, and submit the modified TPC-H Q6 query.
Expand All @@ -108,8 +111,8 @@ cat tpch_q6.scala
```shell script
val lineitem = spark.read.format("orc").load("file:///mnt/lineitem_orcs")
lineitem.createOrReplaceTempView("lineitem")
// The modified TPC-H Q6 query
time{spark.sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate_new >= 8766 and l_shipdate_new < 9131 and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24").show}
// Submit the modified TPC-H Q6 query
time{spark.sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= 8766 and l_shipdate < 9131 and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24").show}
```
Submit test script from spark-shell.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.glutenproject.substrait.expression;

import io.glutenproject.substrait.type.TypeNode;
import io.substrait.proto.Expression;

import java.io.Serializable;

public class CastNode implements ExpressionNode, Serializable {
private final TypeNode typeNode;
private final ExpressionNode expressionNode;

CastNode(TypeNode typeNode, ExpressionNode expressionNode) {
this.typeNode = typeNode;
this.expressionNode = expressionNode;
}

@Override
public Expression toProtobuf() {
Expression.Cast.Builder castBuilder = Expression.Cast.newBuilder();
castBuilder.setType(typeNode.toProtobuf());
castBuilder.setInput(expressionNode.toProtobuf());

Expression.Builder builder = Expression.newBuilder();
builder.setCast(castBuilder.build());
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ public static AggregateFunctionNode makeAggregateFunction(Long functionId,
ArrayList<ExpressionNode> expressionNodes, String phase, TypeNode outputTypeNode) {
return new AggregateFunctionNode(functionId, expressionNodes, phase, outputTypeNode);
}

public static CastNode makeCast(TypeNode typeNode, ExpressionNode expressionNode) {
return new CastNode(typeNode, expressionNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,29 +295,56 @@ case class HashAggregateExecTransformer(
input: RelNode = null,
validation: Boolean): RelNode = {
// Will add a Projection before Aggregate.
val preExprNodes = new util.ArrayList[ExpressionNode]()
// Logic was added to prevent selecting the same column for more than one times,
// so the expression in preExpressions will be unique.
var preExpressions: Seq[Expression] = Seq()
var selections: Seq[Int] = Seq()

// Get the needed expressions from grouping expressions.
groupingExpressions.foreach(expr => {
val preExpr: Expression = ExpressionConverter
.replaceWithExpressionTransformer(expr, originalInputAttributes)
preExprNodes.add(preExpr.asInstanceOf[ExpressionTransformer].doTransform(args))
val foundExpr = preExpressions.find(e => e.semanticEquals(expr)).orNull
if (foundExpr != null) {
// If found, no need to add it to preExpressions again.
// The selecting index will be found.
selections = selections :+ preExpressions.indexOf(foundExpr)
} else {
// If not found, add this expression into preExpressions.
// A new selecting index will be created.
preExpressions = preExpressions :+ expr.clone()
selections = selections :+ (preExpressions.size - 1)
}
})

// Get the needed expressions from aggregation expressions.
aggregateExpressions.foreach(aggExpr => {
val aggregatFunc = aggExpr.aggregateFunction
aggExpr.mode match {
case Partial =>
aggregatFunc.children.toList.map(childExpr => {
val preExpr: Expression = ExpressionConverter
.replaceWithExpressionTransformer(childExpr, originalInputAttributes)
preExprNodes.add(preExpr.asInstanceOf[ExpressionTransformer].doTransform(args))
val foundExpr = preExpressions.find(e => e.semanticEquals(childExpr)).orNull
if (foundExpr != null) {
selections = selections :+ preExpressions.indexOf(foundExpr)
} else {
preExpressions = preExpressions :+ childExpr.clone()
selections = selections :+ (preExpressions.size - 1)
}
})
case other =>
throw new UnsupportedOperationException(s"$other not supported.")
}
})

// Create the expression nodes needed by Project node.
val preExprNodes = new util.ArrayList[ExpressionNode]()
for (expr <- preExpressions) {
val preExpr: Expression = ExpressionConverter
.replaceWithExpressionTransformer(expr, originalInputAttributes)
preExprNodes.add(preExpr.asInstanceOf[ExpressionTransformer].doTransform(args))
}
val inputRel = if (!validation) {
RelBuilder.makeProjectRel(input, preExprNodes)
} else {
// Use a extension node to send the input types through Substrait plan for validation.
// Use a extension node to send the input types through Substrait plan for a validation.
val inputTypeNodeList = new java.util.ArrayList[TypeNode]()
for (attr <- originalInputAttributes) {
inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
Expand All @@ -326,34 +353,27 @@ case class HashAggregateExecTransformer(
Any.pack(TypeBuilder.makeStruct(inputTypeNodeList).toProtobuf))
RelBuilder.makeProjectRel(input, preExprNodes, extensionNode)
}
// Handle pure Aggregate.

// Handle the pure Aggregate after Projection. Both grouping and Aggregate expressions are
// selections.
val groupingList = new util.ArrayList[ExpressionNode]()
var colIdx = 0
while (colIdx < groupingExpressions.size) {
val groupingExpr: ExpressionNode = ExpressionBuilder.makeSelection(colIdx)
val groupingExpr: ExpressionNode = ExpressionBuilder.makeSelection(selections(colIdx))
groupingList.add(groupingExpr)
colIdx += 1
}

// Create Aggregation functions.
val aggregateFunctionList = new util.ArrayList[AggregateFunctionNode]()
aggregateExpressions.foreach(aggExpr => {
val aggregatFunc = aggExpr.aggregateFunction
val childrenNodeList = new util.ArrayList[ExpressionNode]()
val childrenNodes = aggExpr.mode match {
case Partial =>
aggregatFunc.children.toList.map(_ => {
val aggExpr = ExpressionBuilder.makeSelection(colIdx)
colIdx += 1
aggExpr
})
case Final =>
aggregatFunc.inputAggBufferAttributes.toList.map(_ => {
val aggExpr = ExpressionBuilder.makeSelection(colIdx)
colIdx += 1
aggExpr
})
case other =>
throw new UnsupportedOperationException(s"$other not supported.")
}
val childrenNodes = aggregatFunc.children.toList.map(_ => {
val aggExpr = ExpressionBuilder.makeSelection(selections(colIdx))
colIdx += 1
aggExpr
})
for (node <- childrenNodes) {
childrenNodeList.add(node)
}
Expand All @@ -364,6 +384,7 @@ case class HashAggregateExecTransformer(
ConverterUtils.getTypeNode(aggregatFunc.dataType, aggregatFunc.nullable))
aggregateFunctionList.add(aggFunctionNode)
})

RelBuilder.makeAggregateRel(inputRel, groupingList, aggregateFunctionList)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,4 @@ object ConverterUtils extends Logging {
final val ALIAS = "alias"
final val IS_NOT_NULL = "is_not_null"
final val IS_NULL = "is_null"
final val CAST = "cast"
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,9 @@ class CastTransformer(
if (!child_node.isInstanceOf[ExpressionNode]) {
throw new UnsupportedOperationException(s"not supported yet.")
}
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionId = ExpressionBuilder.newScalarFunction(functionMap,
ConverterUtils.makeFuncName(ConverterUtils.CAST, Seq(child.dataType)))
val expressNodes = Lists.newArrayList(child_node.asInstanceOf[ExpressionNode])
val typeNode = ConverterUtils.getTypeNode(dataType, nullable = true)

ExpressionBuilder.makeScalarFunction(functionId, expressNodes, typeNode)
val typeNode = ConverterUtils.getTypeNode(dataType, nullable = true)
ExpressionBuilder.makeCast(typeNode, child_node.asInstanceOf[ExpressionNode])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ case class TransformGuardRule() extends Rule[SparkPlan] {
case batchScan: BatchScanExec =>
val childTransformer = new BatchScanExecTransformer(batchScan.output, batchScan.scan)
if (childTransformer.doValidate()) {
// If the BatchScan passes validation, all the filters can be pushed down and
// the computing of this Filter is not needed.
return true
}
case _ =>
Expand Down
2 changes: 1 addition & 1 deletion tools/build_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ echo "VELOX_SOURCE_DIR=${VELOX_SOURCE_DIR}"
echo "VELOX_INSTALL_DIR=${VELOX_INSTALL_DIR}"
mkdir -p $VELOX_SOURCE_DIR
mkdir -p $VELOX_INSTALL_DIR
git clone https://github.com/rui-mo/velox.git -b velox_for_gazelle_jni $VELOX_SOURCE_DIR
git clone https://github.com/oap-project/velox.git -b main $VELOX_SOURCE_DIR
pushd $VELOX_SOURCE_DIR

scripts/setup-ubuntu.sh
Expand Down

0 comments on commit de7aaa9

Please sign in to comment.