diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt index 597f2c1611d3..3b85a91330cc 100644 --- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt +++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt @@ -2857,3 +2857,474 @@ SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts, FRA [200.0, 50.0] 50 50 GRC [80.0, 30.0] 30 30 TUR [100.0, 75.0] 75 75 + +query ITIPTR +SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate +FROM sales_global AS s +JOIN sales_global AS e + ON s.currency = e.currency AND + s.ts >= e.ts +GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency +ORDER BY s.sn +---- +0 GRC 0 2022-01-01T06:00:00 EUR 30 +1 FRA 1 2022-01-01T08:00:00 EUR 50 +1 TUR 2 2022-01-01T11:30:00 TRY 75 +1 FRA 3 2022-01-02T12:00:00 EUR 200 +0 GRC 4 2022-01-03T10:00:00 EUR 80 +1 TUR 4 2022-01-03T10:00:00 TRY 100 + +# create a table for testing +statement ok +CREATE TABLE sales_global_with_pk (zip_code INT, + country VARCHAR(3), + sn INT, + ts TIMESTAMP, + currency VARCHAR(3), + amount FLOAT, + primary key(sn) + ) as VALUES + (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0), + (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0), + (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0), + (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0), + (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0) + +# create a table for testing, where primary key is composite +statement ok +CREATE TABLE sales_global_with_composite_pk (zip_code INT, + country VARCHAR(3), + sn INT, + ts TIMESTAMP, + currency VARCHAR(3), + amount FLOAT, + primary key(sn, ts) + ) as VALUES + (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0), + (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0), + (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0), + (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0), + (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0) + +# create a table for testing, where sn is unique key +statement ok +CREATE TABLE sales_global_with_unique (zip_code INT, + country VARCHAR(3), + sn INT, + ts TIMESTAMP, + currency VARCHAR(3), + amount FLOAT, + unique(sn) + ) as VALUES + (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0), + (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0), + (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0), + (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0), + (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0), + (1, 'TUR', NULL, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0) + +# when group by contains primary key expression +# we can use all the expressions in the table during selection +# (not just group by expressions + aggregation result) +query TT +EXPLAIN SELECT s.sn, s.amount, 2*s.sn + FROM sales_global_with_pk AS s + GROUP BY sn + ORDER BY sn +---- +logical_plan +Sort: s.sn ASC NULLS LAST +--Projection: s.sn, s.amount, Int64(2) * CAST(s.sn AS Int64) +----Aggregate: groupBy=[[s.sn, s.amount]], aggr=[[]] +------SubqueryAlias: s +--------TableScan: sales_global_with_pk projection=[sn, amount] +physical_plan +SortPreservingMergeExec: [sn@0 ASC NULLS LAST] +--SortExec: expr=[sn@0 ASC NULLS LAST] +----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS Int64) as Int64(2) * s.sn] +------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 +------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] +--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] + +query IRI +SELECT s.sn, s.amount, 2*s.sn + FROM sales_global_with_pk AS s + GROUP BY sn + ORDER BY sn +---- +0 30 0 +1 50 2 +2 75 4 +3 200 6 +4 100 8 + +# Join should propagate primary key successfully +query TT +EXPLAIN SELECT r.sn, SUM(l.amount), r.amount + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r + ON l.sn >= r.sn + GROUP BY r.sn + ORDER BY r.sn +---- +logical_plan +Sort: r.sn ASC NULLS LAST +--Projection: r.sn, SUM(l.amount), r.amount +----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(l.amount)]] +------Projection: l.amount, r.sn, r.amount +--------Inner Join: Filter: l.sn >= r.sn +----------SubqueryAlias: l +------------TableScan: sales_global_with_pk projection=[sn, amount] +----------SubqueryAlias: r +------------TableScan: sales_global_with_pk projection=[sn, amount] +physical_plan +SortPreservingMergeExec: [sn@0 ASC NULLS LAST] +--SortExec: expr=[sn@0 ASC NULLS LAST] +----ProjectionExec: expr=[sn@0 as sn, SUM(l.amount)@2 as SUM(l.amount), amount@1 as amount] +------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[SUM(l.amount)] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 +------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[SUM(l.amount)] +--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount] +----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1 +------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +------------------CoalescePartitionsExec +--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] + +query IRR +SELECT r.sn, SUM(l.amount), r.amount + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r + ON l.sn >= r.sn + GROUP BY r.sn + ORDER BY r.sn +---- +0 455 30 +1 425 50 +2 375 75 +3 300 200 +4 100 100 + +# when primary key consists of composite columns +# to associate it with other fields, aggregate should contain all the composite columns +query IRR +SELECT r.sn, SUM(l.amount), r.amount + FROM sales_global_with_composite_pk AS l + JOIN sales_global_with_composite_pk AS r + ON l.sn >= r.sn + GROUP BY r.sn, r.ts + ORDER BY r.sn +---- +0 455 30 +1 425 50 +2 375 75 +3 300 200 +4 100 100 + +# when primary key consists of composite columns +# to associate it with other fields, aggregate should contain all the composite columns +# if any of the composite column is missing, we cannot use associated indices, inside select expression +# below query should fail +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.amount could not be resolved from available columns: r.sn, SUM\(l.amount\) +SELECT r.sn, SUM(l.amount), r.amount + FROM sales_global_with_composite_pk AS l + JOIN sales_global_with_composite_pk AS r + ON l.sn >= r.sn + GROUP BY r.sn + ORDER BY r.sn + +# left join should propagate right side constraint, +# if right side is a primary key (unique and doesn't contain null) +query IRR +SELECT r.sn, r.amount, SUM(r.amount) + FROM (SELECT * + FROM sales_global_with_pk as l + LEFT JOIN sales_global_with_pk as r + ON l.amount >= r.amount + 10) + GROUP BY r.sn +ORDER BY r.sn +---- +0 30 120 +1 50 150 +2 75 150 +4 100 100 +NULL NULL NULL + +# left join shouldn't propagate right side constraint, +# if right side is a unique key (unique and can contain null) +# Please note that, above query and this one is same except the constraint in the table. +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.amount could not be resolved from available columns: r.sn, SUM\(r.amount\) +SELECT r.sn, r.amount, SUM(r.amount) + FROM (SELECT * + FROM sales_global_with_unique as l + LEFT JOIN sales_global_with_unique as r + ON l.amount >= r.amount + 10) + GROUP BY r.sn +ORDER BY r.sn + +# left semi join should propagate constraint of left side as is. +query IRR +SELECT l.sn, l.amount, SUM(l.amount) + FROM (SELECT * + FROM sales_global_with_unique as l + LEFT SEMI JOIN sales_global_with_unique as r + ON l.amount >= r.amount + 10) + GROUP BY l.sn +ORDER BY l.sn +---- +1 50 50 +2 75 75 +3 200 200 +4 100 100 +NULL 100 100 + +# Similarly, left anti join should propagate constraint of left side as is. +query IRR +SELECT l.sn, l.amount, SUM(l.amount) + FROM (SELECT * + FROM sales_global_with_unique as l + LEFT ANTI JOIN sales_global_with_unique as r + ON l.amount >= r.amount + 10) + GROUP BY l.sn +ORDER BY l.sn +---- +0 30 30 + + +# primary key should be aware from which columns it is associated +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\) +SELECT l.sn, r.sn, SUM(l.amount), r.amount + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r + ON l.sn >= r.sn + GROUP BY l.sn + ORDER BY l.sn + +# window should propagate primary key successfully +query TT +EXPLAIN SELECT * + FROM(SELECT *, SUM(l.amount) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_amount + FROM sales_global_with_pk AS l + ) as l + GROUP BY l.sn + ORDER BY l.sn +---- +logical_plan +Sort: l.sn ASC NULLS LAST +--Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, l.sum_amount +----Aggregate: groupBy=[[l.sn, l.zip_code, l.country, l.ts, l.currency, l.amount, l.sum_amount]], aggr=[[]] +------SubqueryAlias: l +--------Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum_amount +----------WindowAggr: windowExpr=[[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +------------SubqueryAlias: l +--------------TableScan: sales_global_with_pk projection=[zip_code, country, sn, ts, currency, amount] +physical_plan +SortPreservingMergeExec: [sn@2 ASC NULLS LAST] +--SortExec: expr=[sn@2 ASC NULLS LAST] +----ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount] +------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, zip_code@1 as zip_code, country@2 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([sn@0, zip_code@1, country@2, ts@3, currency@4, amount@5, sum_amount@6], 8), input_partitions=1 +------------AggregateExec: mode=Partial, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[] +--------------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@6 as sum_amount] +----------------BoundedWindowAggExec: wdw=[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +------------------CoalescePartitionsExec +--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] + +query ITIPTRR +SELECT * + FROM(SELECT *, SUM(l.amount) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_amount + FROM sales_global_with_pk AS l + ) as l + GROUP BY l.sn + ORDER BY l.sn +---- +0 GRC 0 2022-01-01T06:00:00 EUR 30 80 +1 FRA 1 2022-01-01T08:00:00 EUR 50 155 +1 TUR 2 2022-01-01T11:30:00 TRY 75 325 +1 FRA 3 2022-01-02T12:00:00 EUR 200 375 +1 TUR 4 2022-01-03T10:00:00 TRY 100 300 + +# join should propagate primary key correctly +query IRP +SELECT l.sn, SUM(l.amount), l.ts +FROM + (SELECT * + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r ON l.sn >= r.sn) +GROUP BY l.sn +ORDER BY l.sn +---- +0 30 2022-01-01T06:00:00 +1 100 2022-01-01T08:00:00 +2 225 2022-01-01T11:30:00 +3 800 2022-01-02T12:00:00 +4 500 2022-01-03T10:00:00 + +# Projection propagates primary keys correctly +# (we can use r.ts at the final projection, because it +# is associated with primary key r.sn) +query IRP +SELECT r.sn, SUM(r.amount), r.ts +FROM + (SELECT r.ts, r.sn, r.amount + FROM + (SELECT * + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r ON l.sn >= r.sn)) +GROUP BY r.sn +ORDER BY r.sn +---- +0 150 2022-01-01T06:00:00 +1 200 2022-01-01T08:00:00 +2 225 2022-01-01T11:30:00 +3 400 2022-01-02T12:00:00 +4 100 2022-01-03T10:00:00 + +# after join, new window expressions shouldn't be associated with primary keys +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression rn1 could not be resolved from available columns: r.sn, SUM\(r.amount\) +SELECT r.sn, SUM(r.amount), rn1 +FROM + (SELECT r.ts, r.sn, r.amount, + ROW_NUMBER() OVER() AS rn1 + FROM + (SELECT * + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r ON l.sn >= r.sn)) +GROUP BY r.sn + +# aggregate should propagate primary key successfully +query IPR +SELECT sn, ts, sum1 +FROM ( + SELECT ts, sn, SUM(amount) as sum1 + FROM sales_global_with_pk + GROUP BY sn) +GROUP BY sn +ORDER BY sn +---- +0 2022-01-01T06:00:00 30 +1 2022-01-01T08:00:00 50 +2 2022-01-01T11:30:00 75 +3 2022-01-02T12:00:00 200 +4 2022-01-03T10:00:00 100 + +# aggregate should be able to introduce functional dependence +# (when group by contains single expression, group by expression +# becomes determinant, after aggregation; since we are sure that +# it will consist of unique values.) +# please note that ts is not primary key, still +# we can use sum1, after outer aggregation because +# after inner aggregation, ts becomes determinant +# of functional dependence. +query PR +SELECT ts, sum1 +FROM ( + SELECT ts, SUM(amount) as sum1 + FROM sales_global_with_pk + GROUP BY ts) +GROUP BY ts +ORDER BY ts +---- +2022-01-01T06:00:00 30 +2022-01-01T08:00:00 50 +2022-01-01T11:30:00 75 +2022-01-02T12:00:00 200 +2022-01-03T10:00:00 100 + +# aggregate should update its functional dependence +# mode, if it is guaranteed that, after aggregation +# group by expressions will be unique. +query IRI +SELECT * +FROM ( + SELECT *, ROW_NUMBER() OVER(ORDER BY l.sn) AS rn1 + FROM ( + SELECT l.sn, SUM(l.amount) + FROM ( + SELECT l.sn, l.amount, SUM(l.amount) as sum1 + FROM + (SELECT * + FROM sales_global_with_pk AS l + JOIN sales_global_with_pk AS r ON l.sn >= r.sn) + GROUP BY l.sn) + GROUP BY l.sn) + ) +GROUP BY l.sn +ORDER BY l.sn +---- +0 30 1 +1 50 2 +2 75 3 +3 200 4 +4 100 5 + +# create a table +statement ok +CREATE TABLE FOO (x int, y int) AS VALUES (1, 2), (2, 3), (1, 3); + +# make sure that query runs in multi partitions +statement ok +set datafusion.execution.target_partitions = 8; + +query I +SELECT LAST_VALUE(x) +FROM FOO; +---- +1 + +query II +SELECT x, LAST_VALUE(x) +FROM FOO +GROUP BY x +ORDER BY x; +---- +1 1 +2 2 + +query II +SELECT y, LAST_VALUE(x) +FROM FOO +GROUP BY y +ORDER BY y; +---- +2 1 +3 1 + +# plan of the query above should contain partial +# and final aggregation stages +query TT +EXPLAIN SELECT LAST_VALUE(x) + FROM FOO; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(foo.x)]] +--TableScan: foo projection=[x] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)] +------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] + +query I +SELECT FIRST_VALUE(x) +FROM FOO; +---- +1 + +# similarly plan of the above query should +# contain partial and final aggregation stages. +query TT +EXPLAIN SELECT FIRST_VALUE(x) + FROM FOO; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(foo.x)]] +--TableScan: foo projection=[x] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)] +------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index f322419a7bdc..656f30a13504 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -202,7 +202,7 @@ impl Accumulator for FirstValueAccumulator { let is_set_flags = &states[last_idx]; let flags = is_set_flags.as_boolean(); let mut filtered_first_vals = vec![]; - for state in states.iter().take(last_idx - 1) { + for state in states.iter().take(last_idx) { filtered_first_vals.push(compute::filter(state, flags)?) } self.update_batch(&filtered_first_vals) @@ -387,7 +387,7 @@ impl Accumulator for LastValueAccumulator { let is_set_flags = &states[last_idx]; let flags = is_set_flags.as_boolean(); let mut filtered_first_vals = vec![]; - for state in states.iter().take(last_idx - 1) { + for state in states.iter().take(last_idx) { filtered_first_vals.push(compute::filter(state, flags)?) } self.update_batch(&filtered_first_vals)