Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33278][SQL][FOLLOWUP] Improve OptimizeWindowFunctions to avoid transfer first to nth_value. #30419

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4a6f903
Reuse completeNextStageWithFetchFailure
beliefer Jun 19, 2020
96456e2
Merge remote-tracking branch 'upstream/master'
beliefer Jul 1, 2020
4314005
Merge remote-tracking branch 'upstream/master'
beliefer Jul 3, 2020
d6af4a7
Merge remote-tracking branch 'upstream/master'
beliefer Jul 9, 2020
f69094f
Merge remote-tracking branch 'upstream/master'
beliefer Jul 16, 2020
b86a42d
Merge remote-tracking branch 'upstream/master'
beliefer Jul 25, 2020
2ac5159
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 25, 2020
9021d6c
Merge remote-tracking branch 'upstream/master'
beliefer Jul 28, 2020
74a2ef4
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 28, 2020
9828158
Merge remote-tracking branch 'upstream/master'
beliefer Jul 31, 2020
9cd1aaf
Merge remote-tracking branch 'upstream/master'
beliefer Aug 5, 2020
abfcbb9
Merge remote-tracking branch 'upstream/master'
beliefer Aug 26, 2020
07c6c81
Merge remote-tracking branch 'upstream/master'
beliefer Sep 1, 2020
580130b
Merge remote-tracking branch 'upstream/master'
beliefer Sep 2, 2020
3712808
Merge branch 'master' of github.com:beliefer/spark
beliefer Sep 11, 2020
6107413
Merge remote-tracking branch 'upstream/master'
beliefer Sep 11, 2020
4b799b4
Merge remote-tracking branch 'upstream/master'
beliefer Sep 14, 2020
ee0ecbf
Merge remote-tracking branch 'upstream/master'
beliefer Sep 18, 2020
596bc61
Merge remote-tracking branch 'upstream/master'
beliefer Sep 24, 2020
0164e2f
Merge remote-tracking branch 'upstream/master'
beliefer Sep 27, 2020
90b79fc
Merge remote-tracking branch 'upstream/master'
beliefer Sep 29, 2020
2cef3a9
Merge remote-tracking branch 'upstream/master'
beliefer Oct 13, 2020
c26b64f
Merge remote-tracking branch 'upstream/master'
beliefer Oct 19, 2020
2e02cd2
Merge remote-tracking branch 'upstream/master'
beliefer Oct 22, 2020
a6d0741
Merge remote-tracking branch 'upstream/master'
beliefer Oct 28, 2020
82e5b2c
Merge remote-tracking branch 'upstream/master'
beliefer Nov 4, 2020
70bbf5d
Merge remote-tracking branch 'upstream/master'
beliefer Nov 6, 2020
126a51e
Merge remote-tracking branch 'upstream/master'
beliefer Nov 13, 2020
f2ceacd
Merge remote-tracking branch 'upstream/master'
beliefer Nov 19, 2020
6923dd1
improve OptimizeWindowFunctions.
beliefer Nov 19, 2020
f7d7e7f
improve OptimizeWindowFunctions.
beliefer Nov 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,12 @@ object CollapseRepartition extends Rule[LogicalPlan] {
*/
object OptimizeWindowFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec)
if spec.orderSpec.nonEmpty &&
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame].frameType == RowFrame =>
case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _),
WindowSpecDefinition(_, orderSpec, frameSpecification: SpecifiedWindowFrame))
if orderSpec.nonEmpty && frameSpecification.frameType == RowFrame &&
frameSpecification.lower == UnboundedPreceding &&
(frameSpecification.upper == UnboundedFollowing ||
frameSpecification.upper == CurrentRow) =>
we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest {
val b = testRelation.output(1)
val c = testRelation.output(2)

test("replace first(col) by nth_value(col, 1)") {
test("replace first by nth_value if frame is UNBOUNDED PRECEDING AND CURRENT ROW") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
Expand All @@ -52,7 +52,34 @@ class OptimizeWindowFunctionsSuite extends PlanTest {
assert(optimized == correctAnswer)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame type is range") {
test("replace first by nth_value if frame is UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))))
val correctAnswer = testRelation.select(
WindowExpression(
NthValue(a, Literal(1), false),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == correctAnswer)
}

test("can't replace first by nth_value if frame is not suitable") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
WindowSpecDefinition(b :: Nil, c.asc :: Nil,
SpecifiedWindowFrame(RowFrame, Literal(1), CurrentRow))))

val optimized = Optimize.execute(inputPlan)
assert(optimized == inputPlan)
}

test("can't replace first by nth_value if the window frame type is range") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
Expand All @@ -63,7 +90,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest {
assert(optimized == inputPlan)
}

test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") {
test("can't replace first by nth_value if the window frame isn't ordered") {
val inputPlan = testRelation.select(
WindowExpression(
First(a, false).toAggregateExpression(),
Expand Down