-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37483][SQL] Support push down top N to JDBC data source V2 #34738
Conversation
Test build #145706 has finished for PR 34738 at commit
|
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test status failure |
Kubernetes integration test starting |
case _ => | ||
child transform { | ||
case sort @ Sort(order, _, ScanOperation(_, filter, sHolder: ScanBuilderHolder)) | ||
if filter.length == 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using filter.isEmpty
?
/** | ||
* Pushes down top N to the data source. | ||
*/ | ||
boolean pushTopN(SortValue[] orders, int limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a little strange that pushTopN
with the return value of the Boolean
, How about adding two methods like pushTopN
and pushedTopN
, In this way, the responsibilities of each method are cleaner. FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or probably we should have a new interface SupportsPushDownTopN
, as Spark can either push down limit, or top n, but not both.
sHolder.pushedLimit = Some(limitValue) | ||
sHolder.sortValues = orders | ||
} | ||
sort |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit with sort has been pushed done, Whether the sort node can be remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some database supports partition sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think we can remove the sort node from the query plan.
Kubernetes integration test status failure |
Test build #145711 has finished for PR 34738 at commit
|
Test build #145714 has finished for PR 34738 at commit
|
cc @huaxingao FYI. |
cc @cloud-fan |
@@ -18,6 +18,7 @@ | |||
package org.apache.spark.sql.connector.read; | |||
|
|||
import org.apache.spark.annotation.Evolving; | |||
import org.apache.spark.sql.connector.expressions.SortValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SortValue
is private[sql]
, we shouldn't expose it in the public APIs. We should use SortOrder
instead
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145758 has finished for PR 34738 at commit
|
ping @cloud-fan |
@@ -149,6 +149,7 @@ case class RowDataSourceScanExec( | |||
Map("PushedAggregates" -> seqToString(v.aggregateExpressions), | |||
"PushedGroupByColumns" -> seqToString(v.groupByColumns))} ++ | |||
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") ++ | |||
Map("PushedSortOrders" -> seqToString(pushedDownOperators.sortValues)) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use pushedTopN
and display both sort orders and limit n here?
@@ -25,4 +26,5 @@ import org.apache.spark.sql.connector.expressions.aggregate.Aggregation | |||
case class PushedDownOperators( | |||
aggregation: Option[Aggregation], | |||
sample: Option[TableSampleInfo], | |||
limit: Option[Int]) | |||
limit: Option[Int], | |||
sortValues: Seq[SortOrder]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add an assert that, sortValues
can only present if limit is present
val limitPushed = PushDownUtils.pushLimit(sHolder.builder, limitValue) | ||
if (limitPushed) { | ||
sHolder.pushedLimit = Some(limitValue) | ||
} | ||
globalLimit | ||
case _ => globalLimit | ||
case _ => | ||
child transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks scary, as we don't know what's between limit and sort, and it's dangerous to push down top N.
I think we need to do an exact match: case Sort(..., ScanOperation...)
@@ -370,6 +370,8 @@ abstract class JdbcDialect extends Serializable with Logging{ | |||
*/ | |||
def supportsLimit(): Boolean = true | |||
|
|||
def supportsTopN(): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any database that do not support limit and sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not exists the one.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownTopN.java
Show resolved
Hide resolved
Test build #146132 has finished for PR 34738 at commit
|
} | ||
val newChild = pushDownLimit(child, limitValue) | ||
val newLocalLimit = globalLimit.child.asInstanceOf[LocalLimit].withNewChildren(Seq(newChild)) | ||
globalLimit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
globalLimit | |
globalLimit.copy(child = newLocalLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also update the test to make sure the Sort operator is removed from the query plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
@@ -43,6 +44,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel | |||
.set("spark.sql.catalog.h2.driver", "org.h2.Driver") | |||
.set("spark.sql.catalog.h2.pushDownAggregate", "true") | |||
.set("spark.sql.catalog.h2.pushDownLimit", "true") | |||
.set("spark.sql.catalog.h2.pushDownTopN", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed now.
checkAnswer(df5, Seq(Row(1, "cathy", 9000.00, 1200.0), Row(1, "amy", 10000.00, 1000.0))) | ||
|
||
val df6 = spark.read.table("h2.test.employee") | ||
.where($"dept" === 1).limit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been covered by the limit pushdown tests already.
Kubernetes integration test starting |
Kubernetes integration test status failure |
checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy"))) | ||
} | ||
|
||
private def checkSortRemoved(df: DataFrame, removed: Boolean = true): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this in checkPushedLimit
? If sortValues
is nonEmpty, we check if the sort has been removed from the query plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Test build #146208 has finished for PR 34738 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146219 has finished for PR 34738 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146231 has finished for PR 34738 at commit
|
thanks, merging to master! |
@@ -726,6 +726,21 @@ object DataSourceStrategy | |||
} | |||
} | |||
|
|||
protected[sql] def translateSortOrders(sortOrders: Seq[SortOrder]): Seq[SortOrderV2] = { | |||
sortOrders.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, All. This broke Scala 2.13 compilation.
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:730:20: match may not be exhaustive.
[error] It would fail on the following input: SortOrder(_, _, _, _)
[error] sortOrders.map {
[error] ^
[warn] 24 warnings found
[error] one error found
[error] (sql / Compile / compileIncremental) Compilation failed
[error] Total time: 267 s (04:27), completed Dec 15, 2021 5:57:25 AM
Could you make a follow-up to recover Scala 2.13, @beliefer ? |
@@ -726,6 +726,21 @@ object DataSourceStrategy | |||
} | |||
} | |||
|
|||
protected[sql] def translateSortOrders(sortOrders: Seq[SortOrder]): Seq[SortOrderV2] = { | |||
sortOrders.map { | |||
case SortOrder(PushableColumnWithoutNestedColumn(name), directionV1, nullOrderingV1, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, this will cause scala.MatchError
in Scala 2.12. We need a new test case which is not matched this case, @beliefer .
Im gonna revert this because it did not pass the tests. |
Thank you for the decision, @HyukjinKwon ! |
Could I open another PR to finish this issue ? @dongjoon-hyun @HyukjinKwon cc @cloud-fan |
Yeah, sure. Please go ahead. Thanks @beliefer . |
Thank you. |
@beliefer your github action environment seems broken and I can hardly see a successful run in your github action in various PRs, and I have to rely on the Jenkins results. Maybe let's take this chance to fix your github action, please ping us in your re-submit PR and we can help you investigate if the github action consistently fails with unknown reasons. |
@cloud-fan Thank you help me fix the issue of github action environment. |
What changes were proposed in this pull request?
Currently, Spark supports push down limit to data source.
However, in the user's scenario, limit must have the premise of order by. Because limit and order by are more valuable together.
On the other hand, push down top N(same as order by ... limit N) outputs the data with basic order to Spark sort, the the sort of Spark may have some performance improvement.
Why are the changes needed?
Does this PR introduce any user-facing change?
'No'. Just change the physical execute.
How was this patch tested?
New tests.