Skip to content

Commit 3520742

Browse files
committed
Update stream slt file. (#236)
* Update stream slt file. * Update tests, and comments * Add new test
1 parent 6dbb7fc commit 3520742

File tree

2 files changed

+145
-20
lines changed

2 files changed

+145
-20
lines changed

datafusion/core/src/datasource/stream.rs

-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ impl TableProviderFactory for StreamTableFactory {
6666
.with_header(cmd.has_header)
6767
.with_batch_size(state.config().batch_size())
6868
.with_constraints(cmd.constraints.clone());
69-
7069
Ok(Arc::new(StreamTable(Arc::new(config))))
7170
}
7271
}

datafusion/sqllogictest/test_files/stream.slt

+145-19
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,45 @@
22
# This file does not contain any Apache Software Foundation copyrighted code.
33

44

5-
# Once the PRIMARY KEY is supported, "sn" INTEGER PRIMARY KEY, will be replaced, WITH ORDER (sn ASC) line will be deleted.
5+
# Please note that sn is PRIMARY KEY for this table.
66
statement ok
77
CREATE UNBOUNDED EXTERNAL TABLE sales_us (
8-
"ts" TIMESTAMP,
9-
"sn" INTEGER,
10-
"amount" INTEGER,
11-
"currency" VARCHAR NOT NULL
8+
ts TIMESTAMP,
9+
sn INTEGER,
10+
amount INTEGER,
11+
currency VARCHAR NOT NULL,
12+
primary key(sn)
1213
)
1314
STORED AS CSV
1415
WITH HEADER ROW
1516
WITH ORDER (ts ASC)
1617
WITH ORDER (sn ASC)
1718
LOCATION '../core/tests/data/sales_us.csv';
1819

20+
# Please note that sn is PRIMARY KEY for this table.
1921
statement ok
2022
CREATE UNBOUNDED EXTERNAL TABLE sales_global (
21-
"ts" TIMESTAMP,
22-
"sn" INTEGER,
23-
"amount" INTEGER,
24-
"currency" VARCHAR NOT NULL
23+
ts TIMESTAMP,
24+
sn INTEGER,
25+
amount INTEGER,
26+
currency VARCHAR NOT NULL,
27+
primary key(sn)
2528
)
2629
STORED AS CSV
2730
WITH HEADER ROW
2831
WITH ORDER (ts ASC)
2932
WITH ORDER (sn ASC)
3033
LOCATION '../core/tests/data/sales_global.csv';
3134

35+
# Please note that sn is PRIMARY KEY for this table.
3236
statement ok
3337
CREATE UNBOUNDED EXTERNAL TABLE exchange_rates (
34-
"ts" TIMESTAMP,
35-
"sn" INTEGER,
36-
"currency_from" VARCHAR NOT NULL,
37-
"currency_to" VARCHAR NOT NULL,
38-
"rate" FLOAT
38+
ts TIMESTAMP,
39+
sn INTEGER,
40+
currency_from VARCHAR NOT NULL,
41+
currency_to VARCHAR NOT NULL,
42+
rate FLOAT,
43+
primary key(sn)
3944
)
4045
STORED AS CSV
4146
WITH HEADER ROW
@@ -52,6 +57,20 @@ INTO annotated_sales_us
5257
FROM sales_us AS s
5358
ORDER BY sn
5459

60+
query PIITI
61+
SELECT *
62+
FROM annotated_sales_us
63+
ORDER BY sn ASC
64+
LIMIT 5;
65+
----
66+
2000-01-01T00:00:00 0 83 EUR 83
67+
2000-01-01T00:00:15 1 40 TRY 123
68+
2000-01-01T00:00:30 2 73 TRY 196
69+
2000-01-01T00:00:45 3 54 EUR 250
70+
2000-01-01T00:01:00 4 31 EUR 281
71+
72+
# This query is the version of the query above where result is not sinked.
73+
# Physical plan of the query below shouldn't contain any SortExec.
5574
query TT
5675
EXPLAIN SELECT s.*, SUM(amount) OVER (ORDER BY sn)
5776
FROM sales_us AS s
@@ -79,14 +98,16 @@ LIMIT 5
7998
statement ok
8099
drop table annotated_sales_us;
81100

82-
101+
# Use other syntax to define window clause
83102
statement ok
84103
SELECT s.*, SUM(amount) OVER running_window
85104
INTO annotated_sales_us
86105
FROM sales_us AS s
87106
WINDOW running_window AS (ORDER BY sn)
88107
ORDER BY sn
89108

109+
# This query is the version of the query above where result is not sinked.
110+
# Physical plan of the query below shouldn't contain SortExec.
90111
query TT
91112
EXPLAIN SELECT s.*, SUM(amount) OVER running_window
92113
FROM sales_us AS s
@@ -123,6 +144,8 @@ FROM sales_us AS s
123144
WINDOW sliding_window AS (ORDER BY sn ROWS 100 PRECEDING)
124145
ORDER BY sn
125146

147+
# This query is the version of the query above where result is not sinked.
148+
# Physical plan of the query below shouldn't contain SortExec.
126149
query TT
127150
EXPLAIN SELECT s.*, AVG(amount) OVER sliding_window
128151
FROM sales_us AS s
@@ -151,9 +174,40 @@ LIMIT 5
151174
statement ok
152175
drop table annotated_sales_us;
153176

154-
# WINDOW sliding_window AS (ORDER BY ts RANGE INTERVAL '10' MINUTE PRECEDING) is not supported.
155-
# It also gives an error in Postgre
177+
# Make sure timestamp arithmetic is supported
178+
statement ok
179+
SELECT s.*, AVG(amount) OVER sliding_window
180+
INTO annotated_sales_us
181+
FROM sales_us AS s
182+
WINDOW sliding_window AS (ORDER BY ts RANGE BETWEEN INTERVAL '17' SECOND PRECEDING AND INTERVAL '20' SECOND FOLLOWING)
183+
ORDER BY sn
184+
185+
# This query is the version of the query above where result is not sinked.
186+
# Physical plan of the query below shouldn't contain SortExec.
187+
query TT
188+
EXPLAIN SELECT s.*, AVG(amount) OVER sliding_window
189+
FROM sales_us AS s
190+
WINDOW sliding_window AS (ORDER BY ts RANGE BETWEEN INTERVAL '17' SECOND PRECEDING AND INTERVAL '20' SECOND FOLLOWING)
191+
ORDER BY sn
192+
----
193+
logical_plan
194+
Sort: s.sn ASC NULLS LAST
195+
--WindowAggr: windowExpr=[[AVG(CAST(s.amount AS Float64)) ORDER BY [s.ts ASC NULLS LAST] RANGE BETWEEN 17000000000 PRECEDING AND 20000000000 FOLLOWING AS AVG(s.amount) ORDER BY [s.ts ASC NULLS LAST] RANGE BETWEEN 17 SECOND PRECEDING AND 20 SECOND FOLLOWING]]
196+
----SubqueryAlias: s
197+
------TableScan: sales_us projection=[ts, sn, amount, currency]
198+
physical_plan
199+
BoundedWindowAggExec: wdw=[AVG(s.amount) ORDER BY [s.ts ASC NULLS LAST] RANGE BETWEEN 17 SECOND PRECEDING AND 20 SECOND FOLLOWING: Ok(Field { name: "AVG(s.amount) ORDER BY [s.ts ASC NULLS LAST] RANGE BETWEEN 17 SECOND PRECEDING AND 20 SECOND FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(IntervalMonthDayNano("17000000000")), end_bound: Following(IntervalMonthDayNano("20000000000")) }], mode=[Sorted]
200+
--StreamingTableExec: partition_sizes=1, projection=[ts, sn, amount, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
156201

202+
query PIITR
203+
SELECT * from annotated_sales_us
204+
LIMIT 5
205+
----
206+
2000-01-01T00:00:00 0 83 EUR 61.5
207+
2000-01-01T00:00:15 1 40 TRY 65.333333333333
208+
2000-01-01T00:00:30 2 73 TRY 55.666666666667
209+
2000-01-01T00:00:45 3 54 EUR 52.666666666667
210+
2000-01-01T00:01:00 4 31 EUR 53.666666666667
157211

158212
# Merging Timelines
159213

@@ -220,8 +274,67 @@ physical_plan StreamingTableExec: partition_sizes=1, projection=[ts, amount, cur
220274

221275

222276
# Joining Streams
223-
224-
# (ARRAY_AGG(e.rate ORDER BY e.sn DESC))[1] breaks the pipeline
277+
278+
# last value without requirement
279+
statement ok
280+
SELECT s.*, s.amount * LAST_VALUE(e.rate) AS amount_usd
281+
INTO sales_global_converted
282+
FROM sales_global AS s
283+
JOIN exchange_rates AS e
284+
ON s.currency = e.currency_from AND
285+
e.currency_to = 'USD' AND
286+
s.ts >= e.ts
287+
GROUP BY s.sn
288+
ORDER BY s.sn
289+
290+
# This query is the version of the query above without sink.
291+
query TT
292+
EXPLAIN SELECT s.*, s.amount * LAST_VALUE(e.rate) AS amount_usd
293+
FROM sales_global AS s
294+
JOIN exchange_rates AS e
295+
ON s.currency = e.currency_from AND
296+
e.currency_to = 'USD' AND
297+
s.ts >= e.ts
298+
GROUP BY s.sn
299+
ORDER BY s.sn
300+
----
301+
logical_plan
302+
Sort: s.sn ASC NULLS LAST
303+
--Projection: s.ts, s.sn, s.amount, s.currency, CAST(s.amount AS Float32) * LAST_VALUE(e.rate) AS amount_usd
304+
----Aggregate: groupBy=[[s.sn, s.ts, s.amount, s.currency]], aggr=[[LAST_VALUE(e.rate)]]
305+
------Projection: s.ts, s.sn, s.amount, s.currency, e.rate
306+
--------Inner Join: s.currency = e.currency_from Filter: s.ts >= e.ts
307+
----------SubqueryAlias: s
308+
------------TableScan: sales_global projection=[ts, sn, amount, currency]
309+
----------SubqueryAlias: e
310+
------------Projection: exchange_rates.ts, exchange_rates.currency_from, exchange_rates.rate
311+
--------------Filter: exchange_rates.currency_to = Utf8("USD")
312+
----------------TableScan: exchange_rates projection=[ts, currency_from, currency_to, rate]
313+
physical_plan
314+
SortPreservingMergeExec: [ts@0 ASC NULLS LAST,sn@1 ASC NULLS LAST]
315+
--ProjectionExec: expr=[ts@1 as ts, sn@0 as sn, amount@2 as amount, currency@3 as currency, CAST(amount@2 AS Float32) * LAST_VALUE(e.rate)@4 as amount_usd]
316+
----AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, ts@1 as ts, amount@2 as amount, currency@3 as currency], aggr=[LAST_VALUE(e.rate)], ordering_mode=PartiallySorted([0, 1])
317+
------CoalesceBatchesExec: target_batch_size=8192
318+
--------SortPreservingRepartitionExec: partitioning=Hash([sn@0, ts@1, amount@2, currency@3], 4), input_partitions=4, sort_exprs=ts@1 ASC NULLS LAST,sn@0 ASC NULLS LAST
319+
----------AggregateExec: mode=Partial, gby=[sn@1 as sn, ts@0 as ts, amount@2 as amount, currency@3 as currency], aggr=[LAST_VALUE(e.rate)], ordering_mode=PartiallySorted([0, 1])
320+
------------ProjectionExec: expr=[ts@3 as ts, sn@4 as sn, amount@5 as amount, currency@6 as currency, rate@2 as rate]
321+
--------------PartitionedHashJoinExec: join_type=Inner, on=[(currency_from@1, currency@3)], filter=ts@0 >= ts@1
322+
----------------CoalesceBatchesExec: target_batch_size=8192
323+
------------------SortPreservingRepartitionExec: partitioning=Hash([currency_from@1], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST
324+
--------------------ProjectionExec: expr=[ts@0 as ts, currency_from@1 as currency_from, rate@3 as rate]
325+
----------------------CoalesceBatchesExec: target_batch_size=8192
326+
------------------------FilterExec: currency_to@2 = USD
327+
--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
328+
----------------------------StreamingTableExec: partition_sizes=1, projection=[ts, currency_from, currency_to, rate], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
329+
----------------CoalesceBatchesExec: target_batch_size=8192
330+
------------------SortPreservingRepartitionExec: partitioning=Hash([currency@3], 4), input_partitions=4, sort_exprs=ts@0 ASC NULLS LAST
331+
--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
332+
----------------------StreamingTableExec: partition_sizes=1, projection=[ts, sn, amount, currency], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
333+
334+
# TODO: For following query to run successfully, we have to rewrite following
335+
# (ARRAY_AGG(e.rate ORDER BY e.sn DESC))[1] as LAST_VALUE(e.rate ORDER BY e.sn ASC)
336+
# # (ARRAY_AGG(e.rate ORDER BY e.sn DESC))[1] breaks the pipeline
337+
# statement ok
225338
# SELECT s.*, s.amount * (ARRAY_AGG(e.rate ORDER BY e.sn DESC))[1] AS amount_usd
226339
# INTO sales_global_converted
227340
# FROM sales_global AS s
@@ -232,6 +345,19 @@ physical_plan StreamingTableExec: partition_sizes=1, projection=[ts, amount, cur
232345
# GROUP BY s.sn
233346
# ORDER BY s.sn
234347

348+
# # TODO: Once we have partial sorting below query should execute successfully
349+
# # last value with requirement
350+
# statement ok
351+
# SELECT s.*, s.amount * LAST_VALUE(e.rate ORDER BY e.sn DESC) AS amount_usd
352+
# INTO sales_global_converted2
353+
# FROM sales_global AS s
354+
# JOIN exchange_rates AS e
355+
# ON s.currency = e.currency_from AND
356+
# e.currency_to = 'USD' AND
357+
# s.ts >= e.ts
358+
# GROUP BY s.sn
359+
# ORDER BY s.sn
360+
235361
# A temporary table until Order Equivalence problems resolved. The ProjectionExec does not
236362
# yield the output ordering correct in usual tables. (sn-ts order determines which one is under order equivalence.)
237363

0 commit comments

Comments
 (0)