From 4a6f903897d28a3038918997e692410259a90ae3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 19 Jun 2020 10:36:52 +0800 Subject: [PATCH 1/3] Reuse completeNextStageWithFetchFailure --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..762b14e170fcc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1796,9 +1796,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1870,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1896,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From 6923dd132dd18e24c660c2243b34ef3b4d008b10 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 19 Nov 2020 12:03:59 +0800 Subject: [PATCH 2/3] improve OptimizeWindowFunctions. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 86c46e072c887..9e1bc9786a57a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -811,9 +811,12 @@ object CollapseRepartition extends Rule[LogicalPlan] { */ object OptimizeWindowFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { - case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), spec) - if spec.orderSpec.nonEmpty && - spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame].frameType == RowFrame => + case we @ WindowExpression(AggregateExpression(first: First, _, _, _, _), + WindowSpecDefinition(_, orderSpec, frameSpecification: SpecifiedWindowFrame)) + if orderSpec.nonEmpty && frameSpecification.frameType == RowFrame && + frameSpecification.lower == UnboundedPreceding && + (frameSpecification.upper == UnboundedFollowing || + frameSpecification.upper == CurrentRow) => we.copy(windowFunction = NthValue(first.child, Literal(1), first.ignoreNulls)) } } From f7d7e7fdcf2765afd80da85fa610d6e25310594b Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 19 Nov 2020 12:53:33 +0800 Subject: [PATCH 3/3] improve OptimizeWindowFunctions. --- .../OptimizeWindowFunctionsSuite.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala index 389aaeafe655f..cf850bbe21ce6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeWindowFunctionsSuite.scala @@ -36,7 +36,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest { val b = testRelation.output(1) val c = testRelation.output(2) - test("replace first(col) by nth_value(col, 1)") { + test("replace first by nth_value if frame is UNBOUNDED PRECEDING AND CURRENT ROW") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(), @@ -52,7 +52,34 @@ class OptimizeWindowFunctionsSuite extends PlanTest { assert(optimized == correctAnswer) } - test("can't replace first(col) by nth_value(col, 1) if the window frame type is range") { + test("replace first by nth_value if frame is UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING") { + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(b :: Nil, c.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)))) + val correctAnswer = testRelation.select( + WindowExpression( + NthValue(a, Literal(1), false), + WindowSpecDefinition(b :: Nil, c.asc :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)))) + + val optimized = Optimize.execute(inputPlan) + assert(optimized == correctAnswer) + } + + test("can't replace first by nth_value if frame is not suitable") { + val inputPlan = testRelation.select( + WindowExpression( + First(a, false).toAggregateExpression(), + WindowSpecDefinition(b :: Nil, c.asc :: Nil, + SpecifiedWindowFrame(RowFrame, Literal(1), CurrentRow)))) + + val optimized = Optimize.execute(inputPlan) + assert(optimized == inputPlan) + } + + test("can't replace first by nth_value if the window frame type is range") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(), @@ -63,7 +90,7 @@ class OptimizeWindowFunctionsSuite extends PlanTest { assert(optimized == inputPlan) } - test("can't replace first(col) by nth_value(col, 1) if the window frame isn't ordered") { + test("can't replace first by nth_value if the window frame isn't ordered") { val inputPlan = testRelation.select( WindowExpression( First(a, false).toAggregateExpression(),