Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): enable window agg mean pushdown #19807

Merged
merged 1 commit into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion cmd/influxd/launcher/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,13 +953,13 @@ error2","query plan",109,110
}

func TestQueryPushDowns(t *testing.T) {
t.Skip("Not supported yet")
testcases := []struct {
name string
data []string
query string
op string
want string
skip string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unskipped mean tests and left all other skipped for now. There are not tests for monthly durations yet as that improvement has not yet made it to OSS.

}{
{
name: "range last single point start time",
Expand All @@ -979,6 +979,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window last",
Expand Down Expand Up @@ -1018,6 +1019,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset last",
Expand Down Expand Up @@ -1056,6 +1058,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:11Z,1970-01-01T00:00:14Z,1970-01-01T00:00:13Z,8,f,m0,k0
,,3,1970-01-01T00:00:14Z,1970-01-01T00:00:17Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare last",
Expand Down Expand Up @@ -1090,6 +1093,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty last",
Expand Down Expand Up @@ -1135,6 +1139,7 @@ from(bucket: v.bucket)
#default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0
,result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty offset last",
Expand Down Expand Up @@ -1180,6 +1185,7 @@ from(bucket: v.bucket)
#default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0
,result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate last",
Expand Down Expand Up @@ -1215,6 +1221,7 @@ from(bucket: v.bucket)
,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:10Z,6,f,m0,k0
,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:20Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window first",
Expand Down Expand Up @@ -1254,6 +1261,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window first string",
Expand All @@ -1279,6 +1287,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:05Z,1970-01-01T00:00:02Z,c,f,m,a
,,1,1970-01-01T00:00:05Z,1970-01-01T00:00:10Z,1970-01-01T00:00:07Z,h,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare first",
Expand Down Expand Up @@ -1313,6 +1322,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:05Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty first",
Expand Down Expand Up @@ -1364,6 +1374,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate first",
Expand Down Expand Up @@ -1399,6 +1410,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:00.5Z,0,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:01.5Z,1,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window min",
Expand Down Expand Up @@ -1438,6 +1450,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare min",
Expand Down Expand Up @@ -1472,6 +1485,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:08Z,0,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty min",
Expand Down Expand Up @@ -1513,6 +1527,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate min",
Expand All @@ -1538,6 +1553,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,0,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,0,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window max",
Expand Down Expand Up @@ -1577,6 +1593,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare max",
Expand Down Expand Up @@ -1611,6 +1628,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:14Z,9,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty max",
Expand Down Expand Up @@ -1652,6 +1670,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate max",
Expand All @@ -1677,6 +1696,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,2,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,6,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window count removes empty series",
Expand All @@ -1700,6 +1720,7 @@ from(bucket: v.bucket)
,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,0,f,m,a
,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,1,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count",
Expand Down Expand Up @@ -1737,6 +1758,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,5,f,m0,k0
,,0,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset count",
Expand Down Expand Up @@ -1775,6 +1797,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,3,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count with nulls",
Expand Down Expand Up @@ -1807,6 +1830,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,0,f,m0,k0
,,0,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare count",
Expand Down Expand Up @@ -1842,6 +1866,7 @@ from(bucket: v.bucket)
,result,table,_value,_field,_measurement,k
,,0,15,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window sum removes empty series",
Expand All @@ -1866,6 +1891,7 @@ from(bucket: v.bucket)
,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,,f,m,a
,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,3,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum",
Expand Down Expand Up @@ -1903,6 +1929,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,22,f,m0,k0
,,0,1970-01-01T00:00:15Z,35,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset sum",
Expand Down Expand Up @@ -1941,6 +1968,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,24,f,m0,k0
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,22,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum with nulls",
Expand Down Expand Up @@ -1973,6 +2001,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,,f,m0,k0
,,0,1970-01-01T00:00:15Z,35,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare sum",
Expand Down Expand Up @@ -2008,6 +2037,7 @@ from(bucket: v.bucket)
,result,table,_value,_field,_measurement,k
,,0,67,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare mean",
Expand Down Expand Up @@ -2183,6 +2213,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:00.00Z,0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group none first",
Expand Down Expand Up @@ -2219,6 +2250,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:00.00Z,0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group last",
Expand Down Expand Up @@ -2255,6 +2287,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:15.00Z,5
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group none last",
Expand Down Expand Up @@ -2291,6 +2324,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:15.00Z,5
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count group none",
Expand Down Expand Up @@ -2327,6 +2361,7 @@ from(bucket: v.bucket)
,result,table,_value
,,0,15
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count group",
Expand Down Expand Up @@ -2364,6 +2399,7 @@ from(bucket: v.bucket)
,,0,kk0,8
,,1,kk1,7
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum group none",
Expand Down Expand Up @@ -2400,6 +2436,7 @@ from(bucket: v.bucket)
,result,table,_value
,,0,67
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum group",
Expand Down Expand Up @@ -2437,6 +2474,7 @@ from(bucket: v.bucket)
,,0,kk0,32
,,1,kk1,35
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "min group",
Expand Down Expand Up @@ -2474,6 +2512,7 @@ from(bucket: v.bucket)
,,0,kk0,0
,,1,kk1,1
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "max group",
Expand Down Expand Up @@ -2511,11 +2550,15 @@ from(bucket: v.bucket)
,,0,kk0,9
,,1,kk1,8
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
if tc.skip != "" {
t.Skip(tc.skip)
}
l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownWindowAggregateMean(): true,
feature.PushDownGroupAggregateMinMax(): true,
Expand Down
39 changes: 7 additions & 32 deletions mock/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
)

type StorageReader struct {
ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error)
CloseFn func()
ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
CloseFn func()
}

func (s *StorageReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
Expand Down Expand Up @@ -40,32 +41,6 @@ func (s *StorageReader) Close() {
}
}

type GroupStoreReader struct {
*StorageReader
GroupCapabilityFn func(ctx context.Context) query.GroupCapability
}

func (s *GroupStoreReader) GetGroupCapability(ctx context.Context) query.GroupCapability {
if s.GroupCapabilityFn != nil {
return s.GroupCapabilityFn(ctx)
}
return nil
}

type WindowAggregateStoreReader struct {
*StorageReader
GetWindowAggregateCapabilityFn func(ctx context.Context) query.WindowAggregateCapability
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
}

func (s *WindowAggregateStoreReader) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
// Use the function if it exists.
if s.GetWindowAggregateCapabilityFn != nil {
return s.GetWindowAggregateCapabilityFn(ctx)
}
return nil
}

func (s *WindowAggregateStoreReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
return s.ReadWindowAggregateFn(ctx, spec, alloc)
}
Loading