Skip to content

Commit 31d4ea5

Browse files
committed
Update code
1 parent 5a2f8b8 commit 31d4ea5

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

+9-5
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,15 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
257257
case s @ Sort(order, _, operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder))
258258
if filter.isEmpty =>
259259
val orders = DataSourceStrategy.translateSortOrders(order)
260-
val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
261-
if (topNPushed) {
262-
sHolder.pushedLimit = Some(limit)
263-
sHolder.sortValues = orders
264-
operation
260+
if (orders.length == order.length) {
261+
val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
262+
if (topNPushed) {
263+
sHolder.pushedLimit = Some(limit)
264+
sHolder.sortValues = orders
265+
operation
266+
} else {
267+
s
268+
}
265269
} else {
266270
s
267271
}

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala

+7
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
227227
// LIMIT is pushed down only if all the filters are pushed down
228228
checkPushedLimit(df7)
229229
checkAnswer(df7, Seq(Row(10000.00, 1000.0, "amy")))
230+
231+
val df8 = spark.read
232+
.table("h2.test.employee")
233+
.sort(sub($"NAME"))
234+
.limit(1)
235+
checkPushedLimit(df8)
236+
checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0)))
230237
}
231238

232239
private def createSortValues(

0 commit comments

Comments
 (0)