diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index 3fa526ed4d0..3b0369e4cd0 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -953,13 +953,13 @@ error2","query plan",109,110 } func TestQueryPushDowns(t *testing.T) { - t.Skip("Not supported yet") testcases := []struct { name string data []string query string op string want string + skip string }{ { name: "range last single point start time", @@ -979,6 +979,7 @@ from(bucket: v.bucket) ,result,table,_start,_stop,_time,_value,_field,_measurement,tag ,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window last", @@ -1018,6 +1019,7 @@ from(bucket: v.bucket) ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0 ,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window offset last", @@ -1056,6 +1058,7 @@ from(bucket: v.bucket) ,,2,1970-01-01T00:00:11Z,1970-01-01T00:00:14Z,1970-01-01T00:00:13Z,8,f,m0,k0 ,,3,1970-01-01T00:00:14Z,1970-01-01T00:00:17Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare last", @@ -1090,6 +1093,7 @@ from(bucket: v.bucket) ,result,table,_start,_stop,_time,_value,_field,_measurement,k ,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window empty last", @@ -1135,6 +1139,7 @@ from(bucket: v.bucket) #default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0 ,result,table,_start,_stop,_time,_value,_field,_measurement,k `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window empty offset last", @@ -1180,6 +1185,7 @@ from(bucket: v.bucket) #default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0 ,result,table,_start,_stop,_time,_value,_field,_measurement,k `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window aggregate last", @@ -1215,6 +1221,7 @@ from(bucket: v.bucket) ,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:10Z,6,f,m0,k0 ,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:20Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window first", @@ -1254,6 +1261,7 @@ from(bucket: v.bucket) ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0 ,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window first string", @@ -1279,6 +1287,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:05Z,1970-01-01T00:00:02Z,c,f,m,a ,,1,1970-01-01T00:00:05Z,1970-01-01T00:00:10Z,1970-01-01T00:00:07Z,h,f,m,a `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare first", @@ -1313,6 +1322,7 @@ from(bucket: v.bucket) ,result,table,_start,_stop,_time,_value,_field,_measurement,k ,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:05Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window empty first", @@ -1364,6 +1374,7 @@ from(bucket: v.bucket) #default,_result,3,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,,,f,m0,k0 ,_result,table,_start,_stop,_time,_value,_field,_measurement,k `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window aggregate first", @@ -1399,6 +1410,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:00.5Z,0,f,m0,k0 ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:01.5Z,1,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window min", @@ -1438,6 +1450,7 @@ from(bucket: v.bucket) ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0 ,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare min", @@ -1472,6 +1485,7 @@ from(bucket: v.bucket) ,result,table,_start,_stop,_time,_value,_field,_measurement,k ,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:08Z,0,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window empty min", @@ -1513,6 +1527,7 @@ from(bucket: v.bucket) #default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0 ,_result,table,_start,_stop,_time,_value,_field,_measurement,k `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window aggregate min", @@ -1538,6 +1553,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,0,f,m0,k0 ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,0,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window max", @@ -1577,6 +1593,7 @@ from(bucket: v.bucket) ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0 ,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare max", @@ -1611,6 +1628,7 @@ from(bucket: v.bucket) ,result,table,_start,_stop,_time,_value,_field,_measurement,k ,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:14Z,9,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window empty max", @@ -1652,6 +1670,7 @@ from(bucket: v.bucket) #default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0 ,_result,table,_start,_stop,_time,_value,_field,_measurement,k `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window aggregate max", @@ -1677,6 +1696,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,2,f,m0,k0 ,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,6,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window count removes empty series", @@ -1700,6 +1720,7 @@ from(bucket: v.bucket) ,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,0,f,m,a ,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,1,f,m,a `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "count", @@ -1737,6 +1758,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:10Z,5,f,m0,k0 ,,0,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window offset count", @@ -1775,6 +1797,7 @@ from(bucket: v.bucket) ,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,5,f,m0,k0 ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,3,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "count with nulls", @@ -1807,6 +1830,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:10Z,0,f,m0,k0 ,,0,1970-01-01T00:00:15Z,5,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare count", @@ -1842,6 +1866,7 @@ from(bucket: v.bucket) ,result,table,_value,_field,_measurement,k ,,0,15,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window sum removes empty series", @@ -1866,6 +1891,7 @@ from(bucket: v.bucket) ,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,,f,m,a ,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,3,f,m,a `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "sum", @@ -1903,6 +1929,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:10Z,22,f,m0,k0 ,,0,1970-01-01T00:00:15Z,35,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "window offset sum", @@ -1941,6 +1968,7 @@ from(bucket: v.bucket) ,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,24,f,m0,k0 ,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,22,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "sum with nulls", @@ -1973,6 +2001,7 @@ from(bucket: v.bucket) ,,0,1970-01-01T00:00:10Z,,f,m0,k0 ,,0,1970-01-01T00:00:15Z,35,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare sum", @@ -2008,6 +2037,7 @@ from(bucket: v.bucket) ,result,table,_value,_field,_measurement,k ,,0,67,f,m0,k0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "bare mean", @@ -2183,6 +2213,7 @@ from(bucket: v.bucket) ,result,table,_time,_value ,,0,1970-01-01T00:00:00.00Z,0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "group none first", @@ -2219,6 +2250,7 @@ from(bucket: v.bucket) ,result,table,_time,_value ,,0,1970-01-01T00:00:00.00Z,0 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "group last", @@ -2255,6 +2287,7 @@ from(bucket: v.bucket) ,result,table,_time,_value ,,0,1970-01-01T00:00:15.00Z,5 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "group none last", @@ -2291,6 +2324,7 @@ from(bucket: v.bucket) ,result,table,_time,_value ,,0,1970-01-01T00:00:15.00Z,5 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "count group none", @@ -2327,6 +2361,7 @@ from(bucket: v.bucket) ,result,table,_value ,,0,15 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "count group", @@ -2364,6 +2399,7 @@ from(bucket: v.bucket) ,,0,kk0,8 ,,1,kk1,7 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "sum group none", @@ -2400,6 +2436,7 @@ from(bucket: v.bucket) ,result,table,_value ,,0,67 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "sum group", @@ -2437,6 +2474,7 @@ from(bucket: v.bucket) ,,0,kk0,32 ,,1,kk1,35 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "min group", @@ -2474,6 +2512,7 @@ from(bucket: v.bucket) ,,0,kk0,0 ,,1,kk1,1 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, { name: "max group", @@ -2511,11 +2550,15 @@ from(bucket: v.bucket) ,,0,kk0,9 ,,1,kk1,8 `, + skip: "https://github.com/influxdata/idpe/issues/8828", }, } for _, tc := range testcases { tc := tc t.Run(tc.name, func(t *testing.T) { + if tc.skip != "" { + t.Skip(tc.skip) + } l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{ feature.PushDownWindowAggregateMean(): true, feature.PushDownGroupAggregateMinMax(): true, diff --git a/mock/reader.go b/mock/reader.go index 74696a0ae3f..133c7b7fe32 100644 --- a/mock/reader.go +++ b/mock/reader.go @@ -8,11 +8,12 @@ import ( ) type StorageReader struct { - ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) - ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) - ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) - ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) - CloseFn func() + ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) + ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) + ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) + ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) + ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) + CloseFn func() } func (s *StorageReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) { @@ -40,32 +41,6 @@ func (s *StorageReader) Close() { } } -type GroupStoreReader struct { - *StorageReader - GroupCapabilityFn func(ctx context.Context) query.GroupCapability -} - -func (s *GroupStoreReader) GetGroupCapability(ctx context.Context) query.GroupCapability { - if s.GroupCapabilityFn != nil { - return s.GroupCapabilityFn(ctx) - } - return nil -} - -type WindowAggregateStoreReader struct { - *StorageReader - GetWindowAggregateCapabilityFn func(ctx context.Context) query.WindowAggregateCapability - ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) -} - -func (s *WindowAggregateStoreReader) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability { - // Use the function if it exists. - if s.GetWindowAggregateCapabilityFn != nil { - return s.GetWindowAggregateCapabilityFn(ctx) - } - return nil -} - -func (s *WindowAggregateStoreReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (s *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { return s.ReadWindowAggregateFn(ctx, spec, alloc) } diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go index 7d3a60c30e9..f16d436cf3b 100644 --- a/query/stdlib/influxdata/influxdb/rules.go +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -680,69 +680,19 @@ func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern { } func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool { - caps, ok := capabilities(ctx) - if !ok { - return false - } // Check the aggregate function spec. Require the operation on _value // and check the feature flag associated with the aggregate function. switch fnNode.Kind() { - case universe.MinKind: - if !caps.HaveMin() { - return false - } - minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec) - if minSpec.Column != execute.DefaultValueColLabel { - return false - } - case universe.MaxKind: - if !caps.HaveMax() { - return false - } - maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec) - if maxSpec.Column != execute.DefaultValueColLabel { - return false - } case universe.MeanKind: - if !feature.PushDownWindowAggregateMean().Enabled(ctx) || !caps.HaveMean() { + if !feature.PushDownWindowAggregateMean().Enabled(ctx) { return false } meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec) if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel { return false } - case universe.CountKind: - if !caps.HaveCount() { - return false - } - countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec) - if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel { - return false - } - case universe.SumKind: - if !caps.HaveSum() { - return false - } - sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec) - if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel { - return false - } - case universe.FirstKind: - if !caps.HaveFirst() { - return false - } - firstSpec := fnNode.ProcedureSpec().(*universe.FirstProcedureSpec) - if firstSpec.Column != execute.DefaultValueColLabel { - return false - } - case universe.LastKind: - if !caps.HaveLast() { - return false - } - lastSpec := fnNode.ProcedureSpec().(*universe.LastProcedureSpec) - if lastSpec.Column != execute.DefaultValueColLabel { - return false - } + default: + return false } return true } @@ -769,16 +719,6 @@ func isPushableWindow(windowSpec *universe.WindowProcedureSpec) bool { windowSpec.StopColumn == "_stop" } -func capabilities(ctx context.Context) (query.WindowAggregateCapability, bool) { - reader := GetStorageDependencies(ctx).FromDeps.Reader - windowAggregateReader, ok := reader.(query.WindowAggregateReader) - if !ok { - return nil, false - } - caps := windowAggregateReader.GetWindowAggregateCapability(ctx) - return caps, caps != nil -} - func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) { fnNode := pn if !canPushWindowedAggregate(ctx, fnNode) { @@ -794,10 +734,6 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p return pn, false, nil } - if caps, ok := capabilities(ctx); !ok || windowSpec.Window.Offset.IsPositive() && !caps.HaveOffset() { - return pn, false, nil - } - // Rule passes. return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{ ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec), @@ -946,10 +882,6 @@ func (p GroupWindowAggregateTransposeRule) Rewrite(ctx context.Context, pn plan. return pn, false, nil } - if caps, ok := capabilities(ctx); !ok || windowSpec.Window.Offset.IsPositive() && !caps.HaveOffset() { - return pn, false, nil - } - fromNode := windowNode.Predecessors()[0] fromSpec := fromNode.ProcedureSpec().(*ReadGroupPhysSpec) diff --git a/query/stdlib/influxdata/influxdb/rules_test.go b/query/stdlib/influxdata/influxdb/rules_test.go index e4c3d05e1da..18ebc2a2afb 100644 --- a/query/stdlib/influxdata/influxdb/rules_test.go +++ b/query/stdlib/influxdata/influxdb/rules_test.go @@ -37,10 +37,6 @@ func (caps mockReaderCaps) GetGroupCapability(ctx context.Context) query.GroupCa return caps.GroupCapabilities } -func (caps mockReaderCaps) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability { - return mockWAC{Have: caps.Have} -} - func (caps mockReaderCaps) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { return nil, nil } @@ -56,26 +52,41 @@ func (c mockGroupCapability) HaveLast() bool { return c.last } func (c mockGroupCapability) HaveMin() bool { return c.min } func (c mockGroupCapability) HaveMax() bool { return c.max } -// Mock Window Aggregate Capability -type mockWAC struct { - Have bool -} - -func (m mockWAC) HaveMin() bool { return m.Have } -func (m mockWAC) HaveMax() bool { return m.Have } -func (m mockWAC) HaveMean() bool { return m.Have } -func (m mockWAC) HaveCount() bool { return m.Have } -func (m mockWAC) HaveSum() bool { return m.Have } -func (m mockWAC) HaveFirst() bool { return m.Have } -func (m mockWAC) HaveLast() bool { return m.Have } -func (m mockWAC) HaveOffset() bool { return m.Have } - func fluxTime(t int64) flux.Time { return flux.Time{ Absolute: time.Unix(0, t).UTC(), } } +var skipTests = map[string]string{ + "push down sum": "unskip once sum is ported", + "push down first": "unskip once first is ported", + "push down last": "unskip once last is ported", + "push down count": "unskip once count is ported", + "WithSuccessor": "unskip once min is ported", + "WindowPositiveOffset": "unskip once last is ported", + "SimplePassMax": "unskip once max is ported", + "SimplePassMin": "unskip once min is ported", + "SimplePassFirst": "unskip once first is ported", + "SimplePassLast": "unskip once last is ported", + "GroupByStartPassMin": "unskip once min is ported", + "GroupByHostPassMin": "unskip once min is ported", + "SimplePassCount": "unskip once count is ported", + "SimplePassSum": "unskip once sum is ported", + "PositiveOffset": "unskip once min is ported", + "CreateEmptyPassMin": "unskip once min is ported", + "AggregateWindowCountInvalidClosingWindowMultiple": "unskip once count is ported", + "AggregateWindowCountMultipleMatches": "unskip once count is ported", + "AggregateWindowCountInvalidDuplicateAs": "unskip once count is ported", + "AggregateWindowCountInvalidClosingWindow": "unskip once count is ported", + "AggregateWindowCountInvalidDuplicateColumn": "unskip once count is ported", + "AggregateWindowCountWrongSchemaMutator": "unskip once count is ported", + "AggregateWindowCount": "unskip once count is ported", + "AggregateWindowCount#01": "unskip once count is ported", + "AggregateWindowCountCreateEmpty": "unskip once count is ported", + "AggregateWindowCountInvalidClosingWindowCreateEmpty": "unskip once count is ported", +} + func TestPushDownRangeRule(t *testing.T) { fromSpec := influxdb.FromStorageProcedureSpec{ Bucket: influxdb.NameOrID{Name: "my-bucket"}, @@ -1941,6 +1952,9 @@ func TestPushDownWindowAggregateRule(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.Name, func(t *testing.T) { + if _, ok := skipTests[tc.Name]; ok { + t.Skip(skipTests[tc.Name]) + } t.Parallel() plantest.PhysicalRuleTestHelper(t, &tc) }) @@ -2491,6 +2505,9 @@ func TestTransposeGroupToWindowAggregateRule(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.Name, func(t *testing.T) { + if _, ok := skipTests[tc.Name]; ok { + t.Skip(skipTests[tc.Name]) + } t.Parallel() plantest.PhysicalRuleTestHelper(t, &tc) }) @@ -2634,6 +2651,9 @@ func TestPushDownBareAggregateRule(t *testing.T) { for _, tc := range testcases { tc := tc t.Run(tc.Name, func(t *testing.T) { + if _, ok := skipTests[tc.Name]; ok { + t.Skip(skipTests[tc.Name]) + } t.Parallel() plantest.PhysicalRuleTestHelper(t, &tc) }) diff --git a/query/stdlib/influxdata/influxdb/source_test.go b/query/stdlib/influxdata/influxdb/source_test.go index 71776aac9c0..33d6e483b14 100644 --- a/query/stdlib/influxdata/influxdb/source_test.go +++ b/query/stdlib/influxdata/influxdb/source_test.go @@ -205,7 +205,7 @@ func TestReadWindowAggregateSource(t *testing.T) { universe.SumKind, }, } - reader := &mock.WindowAggregateStoreReader{ + reader := &mock.StorageReader{ ReadWindowAggregateFn: func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { if want, got := orgID, spec.OrganizationID; want != got { t.Errorf("unexpected organization id -want/+got:\n\t- %s\n\t+ %s", want, got) diff --git a/query/storage.go b/query/storage.go index c57393a7073..308e580c651 100644 --- a/query/storage.go +++ b/query/storage.go @@ -37,24 +37,8 @@ type GroupAggregator interface { GetGroupCapability(ctx context.Context) GroupCapability } -// WindowAggregateCapability describes what is supported by WindowAggregateReader. -type WindowAggregateCapability interface { - HaveMin() bool - HaveMax() bool - HaveMean() bool - HaveCount() bool - HaveSum() bool - HaveFirst() bool - HaveLast() bool - HaveOffset() bool -} - // WindowAggregateReader implements the WindowAggregate capability. type WindowAggregateReader interface { - // GetWindowAggregateCapability will get a detailed list of what the RPC call supports - // for window aggregate. - GetWindowAggregateCapability(ctx context.Context) WindowAggregateCapability - // ReadWindowAggregate will read a table using the WindowAggregate method. ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error) } @@ -96,6 +80,7 @@ type ReadWindowAggregateSpec struct { Aggregates []plan.ProcedureKind CreateEmpty bool TimeColumn string + Window execute.Window } func (spec *ReadWindowAggregateSpec) Name() string { diff --git a/storage/flux/reader.go b/storage/flux/reader.go index ec273bb24a7..60c3f52daf8 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -3,6 +3,7 @@ package storageflux import ( "context" "fmt" + "github.com/influxdata/flux/plan" "strings" "github.com/gogo/protobuf/types" @@ -10,6 +11,8 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/errors" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/query" storage "github.com/influxdata/influxdb/v2/storage/reads" @@ -54,6 +57,16 @@ type storeReader struct { s storage.Store } +func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { + return &windowAggregateIterator{ + ctx: ctx, + s: r.s, + spec: spec, + cache: newTagsCache(0), + alloc: alloc, + }, nil +} + // NewReader returns a new storageflux reader func NewReader(s storage.Store) query.StorageReader { return &storeReader{s: s} @@ -218,6 +231,276 @@ READ: return rs.Err() } +type windowAggregateIterator struct { + ctx context.Context + s storage.Store + spec query.ReadWindowAggregateSpec + stats cursors.CursorStats + cache *tagsCache + alloc *memory.Allocator +} + +func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats } + +func (wai *windowAggregateIterator) Do(f func(flux.Table) error) error { + src := wai.s.GetSource( + uint64(wai.spec.OrganizationID), + uint64(wai.spec.BucketID), + ) + + // Setup read request + any, err := types.MarshalAny(src) + if err != nil { + return err + } + + var req datatypes.ReadWindowAggregateRequest + req.ReadSource = any + req.Predicate = wai.spec.Predicate + req.Range.Start = int64(wai.spec.Bounds.Start) + req.Range.End = int64(wai.spec.Bounds.Stop) + + if wai.spec.WindowEvery != 0 || wai.spec.Offset != 0 { + req.Window = &datatypes.Window{ + Every: &datatypes.Duration{ + Nsecs: wai.spec.WindowEvery, + }, + Offset: &datatypes.Duration{ + Nsecs: wai.spec.Offset, + }, + } + } else { + req.Window = &datatypes.Window{ + Every: &datatypes.Duration{ + Nsecs: wai.spec.Window.Every.Nanoseconds(), + Months: wai.spec.Window.Every.Months(), + Negative: wai.spec.Window.Every.IsNegative(), + }, + Offset: &datatypes.Duration{ + Nsecs: wai.spec.Window.Offset.Nanoseconds(), + Months: wai.spec.Window.Offset.Months(), + Negative: wai.spec.Window.Offset.IsNegative(), + }, + } + } + + req.Aggregate = make([]*datatypes.Aggregate, len(wai.spec.Aggregates)) + + for i, aggKind := range wai.spec.Aggregates { + if agg, err := determineAggregateMethod(string(aggKind)); err != nil { + return err + } else if agg != datatypes.AggregateTypeNone { + req.Aggregate[i] = &datatypes.Aggregate{Type: agg} + } + } + + aggStore, ok := wai.s.(storage.Store) + if !ok { + return errors.New("storage does not support window aggregate") + } + rs, err := aggStore.WindowAggregate(wai.ctx, &req) + if err != nil { + return err + } + + if rs == nil { + return nil + } + return wai.handleRead(f, rs) +} + +const ( + CountKind = "count" + SumKind = "sum" + FirstKind = "first" + LastKind = "last" + MinKind = "min" + MaxKind = "max" + MeanKind = "mean" +) + +// isSelector returns true if given a procedure kind that represents a selector operator. +func isSelector(kind plan.ProcedureKind) bool { + return kind == FirstKind || kind == LastKind || kind == MinKind || kind == MaxKind +} + +func isAggregateCount(kind plan.ProcedureKind) bool { + return kind == CountKind +} + +func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error { + var window execute.Window + if wai.spec.WindowEvery != 0 || wai.spec.Offset != 0 { + windowEvery := wai.spec.WindowEvery + offset := wai.spec.Offset + everyDur := values.MakeDuration(windowEvery, 0, false) + offsetDur := values.MakeDuration(offset, 0, false) + + window = execute.Window{ + Every: everyDur, + Period: everyDur, + Offset: offsetDur, + } + } else { + window = wai.spec.Window + if window.Every != window.Period { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "planner should ensure that period equals every", + } + } + } + + createEmpty := wai.spec.CreateEmpty + + selector := len(wai.spec.Aggregates) > 0 && isSelector(wai.spec.Aggregates[0]) + + timeColumn := wai.spec.TimeColumn + if timeColumn == "" { + tableFn := f + f = func(table flux.Table) error { + return splitWindows(wai.ctx, wai.alloc, table, selector, tableFn) + } + } + + // these resources must be closed if not nil on return + var ( + cur cursors.Cursor + table storageTable + ) + + defer func() { + if table != nil { + table.Close() + } + if cur != nil { + cur.Close() + } + rs.Close() + wai.cache.Release() + }() + +READ: + for rs.Next() { + cur = rs.Cursor() + if cur == nil { + // no data for series key + field combination + continue + } + + bnds := wai.spec.Bounds + key := defaultGroupKeyForSeries(rs.Tags(), bnds) + done := make(chan struct{}) + hasTimeCol := timeColumn != "" + switch typedCur := cur.(type) { + case cursors.IntegerArrayCursor: + if !selector { + var fillValue *int64 + if isAggregateCount(wai.spec.Aggregates[0]) { + fillValue = func(v int64) *int64 { return &v }(0) + } + cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TInt, hasTimeCol) + table = newIntegerWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) + } else if createEmpty && !hasTimeCol { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } else { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } + case cursors.FloatArrayCursor: + if !selector { + cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TFloat, hasTimeCol) + table = newFloatWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) + } else if createEmpty && !hasTimeCol { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } else { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } + case cursors.UnsignedArrayCursor: + if !selector { + cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TUInt, hasTimeCol) + table = newUnsignedWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) + } else if createEmpty && !hasTimeCol { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } else { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } + case cursors.BooleanArrayCursor: + if !selector { + cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TBool, hasTimeCol) + table = newBooleanWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) + } else if createEmpty && !hasTimeCol { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } else { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } + case cursors.StringArrayCursor: + if !selector { + cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TString, hasTimeCol) + table = newStringWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) + } else if createEmpty && !hasTimeCol { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } else { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "window selectors not yet supported in storage", + } + } + default: + panic(fmt.Sprintf("unreachable: %T", typedCur)) + } + + cur = nil + + if !table.Empty() { + if err := f(table); err != nil { + table.Close() + table = nil + return err + } + select { + case <-done: + case <-wai.ctx.Done(): + table.Cancel() + break READ + } + } + + stats := table.Statistics() + wai.stats.ScannedValues += stats.ScannedValues + wai.stats.ScannedBytes += stats.ScannedBytes + table.Close() + table = nil + } + return rs.Err() +} + type groupIterator struct { ctx context.Context s storage.Store @@ -377,12 +660,57 @@ func convertGroupMode(m query.GroupMode) datatypes.ReadGroupRequest_Group { } const ( - startColIdx = 0 - stopColIdx = 1 - timeColIdx = 2 - valueColIdx = 3 + startColIdx = 0 + stopColIdx = 1 + timeColIdx = 2 + valueColIdxWithoutTime = 2 + valueColIdx = 3 ) +func determineTableColsForWindowAggregate(tags models.Tags, typ flux.ColType, hasTimeCol bool) ([]flux.ColMeta, [][]byte) { + var cols []flux.ColMeta + var defs [][]byte + + // aggregates remove the _time column + size := 3 + if hasTimeCol { + size++ + } + cols = make([]flux.ColMeta, size+len(tags)) + defs = make([][]byte, size+len(tags)) + cols[startColIdx] = flux.ColMeta{ + Label: execute.DefaultStartColLabel, + Type: flux.TTime, + } + cols[stopColIdx] = flux.ColMeta{ + Label: execute.DefaultStopColLabel, + Type: flux.TTime, + } + if hasTimeCol { + cols[timeColIdx] = flux.ColMeta{ + Label: execute.DefaultTimeColLabel, + Type: flux.TTime, + } + cols[valueColIdx] = flux.ColMeta{ + Label: execute.DefaultValueColLabel, + Type: typ, + } + } else { + cols[valueColIdxWithoutTime] = flux.ColMeta{ + Label: execute.DefaultValueColLabel, + Type: typ, + } + } + for j, tag := range tags { + cols[size+j] = flux.ColMeta{ + Label: string(tag.Key), + Type: flux.TString, + } + defs[size+j] = []byte("") + } + return cols, defs +} + func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) { cols := make([]flux.ColMeta, 4+len(tags)) defs := make([][]byte, 4+len(tags)) diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 966c5fcd263..a294a99fec3 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -9,10 +9,12 @@ package storageflux import ( "sync" + "github.com/apache/arrow/go/arrow/array" "github.com/influxdata/flux" "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/models" storage "github.com/influxdata/influxdb/v2/storage/reads" @@ -97,6 +99,220 @@ func (t *floatTable) advance() bool { return true } +// window table +type floatWindowTable struct { + floatTable + arr *cursors.FloatArray + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window +} + +func newFloatWindowTable( + done chan struct{}, + cur cursors.FloatArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *floatWindowTable { + t := &floatWindowTable{ + floatTable: floatTable{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *floatWindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *floatWindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *floatWindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *floatWindowTable) nextAt(ts int64) (v float64, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *floatWindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *floatWindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *floatWindowTable) appendValues(intervals []int64, appendValue func(v float64), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *floatWindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type floatGroupTable struct { @@ -288,6 +504,222 @@ func (t *integerTable) advance() bool { return true } +// window table +type integerWindowTable struct { + integerTable + arr *cursors.IntegerArray + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window + fillValue *int64 +} + +func newIntegerWindowTable( + done chan struct{}, + cur cursors.IntegerArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + fillValue *int64, + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *integerWindowTable { + t := &integerWindowTable{ + integerTable: integerTable{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + fillValue: fillValue, + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *integerWindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *integerWindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *integerWindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *integerWindowTable) nextAt(ts int64) (v int64, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *integerWindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *integerWindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *integerWindowTable) appendValues(intervals []int64, appendValue func(v int64), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *integerWindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type integerGroupTable struct { @@ -479,6 +911,220 @@ func (t *unsignedTable) advance() bool { return true } +// window table +type unsignedWindowTable struct { + unsignedTable + arr *cursors.UnsignedArray + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window +} + +func newUnsignedWindowTable( + done chan struct{}, + cur cursors.UnsignedArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *unsignedWindowTable { + t := &unsignedWindowTable{ + unsignedTable: unsignedTable{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *unsignedWindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *unsignedWindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *unsignedWindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *unsignedWindowTable) nextAt(ts int64) (v uint64, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *unsignedWindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *unsignedWindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *unsignedWindowTable) appendValues(intervals []int64, appendValue func(v uint64), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *unsignedWindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type unsignedGroupTable struct { @@ -670,6 +1316,220 @@ func (t *stringTable) advance() bool { return true } +// window table +type stringWindowTable struct { + stringTable + arr *cursors.StringArray + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window +} + +func newStringWindowTable( + done chan struct{}, + cur cursors.StringArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *stringWindowTable { + t := &stringWindowTable{ + stringTable: stringTable{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *stringWindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *stringWindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *stringWindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *stringWindowTable) nextAt(ts int64) (v string, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *stringWindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *stringWindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *stringWindowTable) appendValues(intervals []int64, appendValue func(v string), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *stringWindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type stringGroupTable struct { @@ -861,6 +1721,220 @@ func (t *booleanTable) advance() bool { return true } +// window table +type booleanWindowTable struct { + booleanTable + arr *cursors.BooleanArray + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window +} + +func newBooleanWindowTable( + done chan struct{}, + cur cursors.BooleanArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *booleanWindowTable { + t := &booleanWindowTable{ + booleanTable: booleanTable{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *booleanWindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *booleanWindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *booleanWindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *booleanWindowTable) nextAt(ts int64) (v bool, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *booleanWindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *booleanWindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *booleanWindowTable) appendValues(intervals []int64, appendValue func(v bool), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *booleanWindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type booleanGroupTable struct { diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index eabe6281404..b167d9c00ba 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -3,10 +3,12 @@ package storageflux import ( "sync" + "github.com/apache/arrow/go/arrow/array" "github.com/influxdata/flux" "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/v2" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/models" @@ -91,6 +93,223 @@ func (t *{{.name}}Table) advance() bool { return true } + +// window table +type {{.name}}WindowTable struct { + {{.name}}Table + arr *cursors.{{.Name}}Array + nextTS int64 + idxInArr int + createEmpty bool + timeColumn string + window execute.Window + {{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}} +} + +func new{{.Name}}WindowTable( + done chan struct{}, + cur cursors.{{.Name}}ArrayCursor, + bounds execute.Bounds, + window execute.Window, + createEmpty bool, + timeColumn string, + {{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}} + key flux.GroupKey, + cols []flux.ColMeta, + tags models.Tags, + defs [][]byte, + cache *tagsCache, + alloc *memory.Allocator, +) *{{.name}}WindowTable { + t := &{{.name}}WindowTable{ + {{.name}}Table: {{.name}}Table{ + table: newTable(done, bounds, key, cols, defs, cache, alloc), + cur: cur, + }, + window: window, + createEmpty: createEmpty, + timeColumn: timeColumn, + {{if eq .Name "Integer"}}fillValue: fillValue,{{end}} + } + if t.createEmpty { + start := int64(bounds.Start) + t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) + } + t.readTags(tags) + t.init(t.advance) + + return t +} + +func (t *{{.name}}WindowTable) Do(f func(flux.ColReader) error) error { + return t.do(f, t.advance) +} + +// createNextBufferTimes will read the timestamps from the array +// cursor and construct the values for the next buffer. +func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { + startB := arrow.NewIntBuilder(t.alloc) + stopB := arrow.NewIntBuilder(t.alloc) + + if t.createEmpty { + // There are no more windows when the start time is greater + // than or equal to the stop time. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { + return nil, nil, false + } + + // Create a buffer with the buffer size. + // TODO(jsternberg): Calculate the exact size with max points as the maximum. + startB.Resize(storage.MaxPointsPerBlock) + stopB.Resize(storage.MaxPointsPerBlock) + for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { + startT, stopT := t.getWindowBoundsFor(t.nextTS) + if startT >= int64(t.bounds.Stop) { + break + } + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true + } + + // Retrieve the next buffer so we can copy the timestamps. + if !t.nextBuffer() { + return nil, nil, false + } + + // Copy over the timestamps from the next buffer and adjust + // times for the boundaries. + startB.Resize(len(t.arr.Timestamps)) + stopB.Resize(len(t.arr.Timestamps)) + for _, stopT := range t.arr.Timestamps { + startT, stopT := t.getWindowBoundsFor(stopT) + startB.Append(startT) + stopB.Append(stopT) + } + start = startB.NewInt64Array() + stop = stopB.NewInt64Array() + return start, stop, true +} + +func (t *{{.name}}WindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + startT, stopT = int64(values.Time(ts).Add(subEvery)), ts + if startT < int64(t.bounds.Start) { + startT = int64(t.bounds.Start) + } + if stopT > int64(t.bounds.Stop) { + stopT = int64(t.bounds.Stop) + } + return startT, stopT +} + +// nextAt will retrieve the next value that can be used with +// the given stop timestamp. If no values can be used with the timestamp, +// it will return the default value and false. +func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) { + if !t.nextBuffer() { + return + } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { + return + } + v, ok = t.arr.Values[t.idxInArr], true + t.idxInArr++ + return v, ok +} + +// isInWindow will check if the given time at stop can be used within +// the window stop time for ts. The ts may be a truncated stop time +// because of a restricted boundary while stop will be the true +// stop time returned by storage. +func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool { + // This method checks if the stop time is a valid stop time for + // that interval. This calculation is different from the calculation + // of the window itself. For example, for a 10 second window that + // starts at 20 seconds, we would include points between [20, 30). + // The stop time for this interval would be 30, but because the stop + // time can be truncated, valid stop times range from anywhere between + // (20, 30]. The storage engine will always produce 30 as the end time + // but we may have truncated the stop time because of the boundary + // and this is why we are checking for this range instead of checking + // if the two values are equal. + subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) + start := int64(values.Time(stop).Add(subEvery)) + return start < ts && ts <= stop +} + +// nextBuffer will ensure the array cursor is filled +// and will return true if there is at least one value +// that can be read from it. +func (t *{{.name}}WindowTable) nextBuffer() bool { + // Discard the current array cursor if we have + // exceeded it. + if t.arr != nil && t.idxInArr >= t.arr.Len() { + t.arr = nil + } + + // Retrieve the next array cursor if needed. + if t.arr == nil { + arr := t.cur.Next() + if arr.Len() == 0 { + return false + } + t.arr, t.idxInArr = arr, 0 + } + return true +} + +// appendValues will scan the timestamps and append values +// that match those timestamps from the buffer. +func (t *{{.name}}WindowTable) appendValues(intervals []int64, appendValue func(v {{.Type}}), appendNull func()) { + for i := 0; i < len(intervals); i++ { + if v, ok := t.nextAt(intervals[i]); ok { + appendValue(v) + continue + } + appendNull() + } +} + +func (t *{{.name}}WindowTable) advance() bool { + if !t.nextBuffer() { + return false + } + // Create the timestamps for the next window. + start, stop, ok := t.createNextBufferTimes() + if !ok { + return false + } + values := t.mergeValues(stop.Int64Values()) + + // Retrieve the buffer for the data to avoid allocating + // additional slices. If the buffer is still being used + // because the references were retained, then we will + // allocate a new buffer. + cr := t.allocateBuffer(stop.Len()) + if t.timeColumn != "" { + switch t.timeColumn { + case execute.DefaultStopColLabel: + cr.cols[timeColIdx] = stop + start.Release() + case execute.DefaultStartColLabel: + cr.cols[timeColIdx] = start + stop.Release() + } + cr.cols[valueColIdx] = values + t.appendBounds(cr) + } else { + cr.cols[startColIdx] = start + cr.cols[stopColIdx] = stop + cr.cols[valueColIdxWithoutTime] = values + } + t.appendTags(cr) + return true +} + // group table type {{.name}}GroupTable struct { diff --git a/storage/flux/table.go b/storage/flux/table.go index ef4ad113ebc..67cd6be99d8 100644 --- a/storage/flux/table.go +++ b/storage/flux/table.go @@ -24,7 +24,8 @@ type table struct { tags [][]byte defs [][]byte - done chan struct{} + done chan struct{} + empty bool colBufs *colReader @@ -69,6 +70,10 @@ func (t *table) isCancelled() bool { return atomic.LoadInt32(&t.cancelled) != 0 } +func (t *table) init(advance func() bool) { + t.empty = !advance() && t.err == nil +} + func (t *table) do(f func(flux.ColReader) error, advance func() bool) error { // Mark this table as having been used. If this doesn't // succeed, then this has already been invoked somewhere else. @@ -229,27 +234,61 @@ func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 { func (t *floatGroupTable) toArrowBuffer(vs []float64) *array.Float64 { return arrow.NewFloat(vs, t.alloc) } +func (t *floatWindowTable) mergeValues(intervals []int64) *array.Float64 { + b := arrow.NewFloatBuilder(t.alloc) + b.Resize(len(intervals)) + t.appendValues(intervals, b.Append, b.AppendNull) + return b.NewFloat64Array() +} func (t *integerTable) toArrowBuffer(vs []int64) *array.Int64 { return arrow.NewInt(vs, t.alloc) } func (t *integerGroupTable) toArrowBuffer(vs []int64) *array.Int64 { return arrow.NewInt(vs, t.alloc) } +func (t *integerWindowTable) mergeValues(intervals []int64) *array.Int64 { + b := arrow.NewIntBuilder(t.alloc) + b.Resize(len(intervals)) + appendNull := b.AppendNull + if t.fillValue != nil { + appendNull = func() { b.Append(*t.fillValue) } + } + t.appendValues(intervals, b.Append, appendNull) + return b.NewInt64Array() +} func (t *unsignedTable) toArrowBuffer(vs []uint64) *array.Uint64 { return arrow.NewUint(vs, t.alloc) } func (t *unsignedGroupTable) toArrowBuffer(vs []uint64) *array.Uint64 { return arrow.NewUint(vs, t.alloc) } +func (t *unsignedWindowTable) mergeValues(intervals []int64) *array.Uint64 { + b := arrow.NewUintBuilder(t.alloc) + b.Resize(len(intervals)) + t.appendValues(intervals, b.Append, b.AppendNull) + return b.NewUint64Array() +} func (t *stringTable) toArrowBuffer(vs []string) *array.Binary { return arrow.NewString(vs, t.alloc) } func (t *stringGroupTable) toArrowBuffer(vs []string) *array.Binary { return arrow.NewString(vs, t.alloc) } +func (t *stringWindowTable) mergeValues(intervals []int64) *array.Binary { + b := arrow.NewStringBuilder(t.alloc) + b.Resize(len(intervals)) + t.appendValues(intervals, b.AppendString, b.AppendNull) + return b.NewBinaryArray() +} func (t *booleanTable) toArrowBuffer(vs []bool) *array.Boolean { return arrow.NewBool(vs, t.alloc) } func (t *booleanGroupTable) toArrowBuffer(vs []bool) *array.Boolean { return arrow.NewBool(vs, t.alloc) } +func (t *booleanWindowTable) mergeValues(intervals []int64) *array.Boolean { + b := arrow.NewBoolBuilder(t.alloc) + b.Resize(len(intervals)) + t.appendValues(intervals, b.Append, b.AppendNull) + return b.NewBooleanArray() +} diff --git a/storage/flux/window.go b/storage/flux/window.go new file mode 100644 index 00000000000..3337656378f --- /dev/null +++ b/storage/flux/window.go @@ -0,0 +1,199 @@ +package storageflux + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/influxdata/flux" + "github.com/influxdata/flux/arrow" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/values" + "github.com/influxdata/influxdb/v2" +) + +// splitWindows will split a windowTable by creating a new table from each +// row and modifying the group key to use the start and stop values from +// that row. +func splitWindows(ctx context.Context, alloc memory.Allocator, in flux.Table, selector bool, f func(t flux.Table) error) error { + wts := &windowTableSplitter{ + ctx: ctx, + in: in, + alloc: alloc, + selector: selector, + } + return wts.Do(f) +} + +type windowTableSplitter struct { + ctx context.Context + in flux.Table + alloc memory.Allocator + selector bool +} + +func (w *windowTableSplitter) Do(f func(flux.Table) error) error { + defer w.in.Done() + + startIdx, err := w.getTimeColumnIndex(execute.DefaultStartColLabel) + if err != nil { + return err + } + + stopIdx, err := w.getTimeColumnIndex(execute.DefaultStopColLabel) + if err != nil { + return err + } + + return w.in.Do(func(cr flux.ColReader) error { + // Retrieve the start and stop columns for splitting + // the windows. + start := cr.Times(startIdx) + stop := cr.Times(stopIdx) + + // Iterate through each time to produce a table + // using the start and stop values. + arrs := make([]array.Interface, len(cr.Cols())) + for j := range cr.Cols() { + arrs[j] = getColumnValues(cr, j) + } + + values := arrs[valueColIdx] + + for i, n := 0, cr.Len(); i < n; i++ { + startT, stopT := start.Value(i), stop.Value(i) + + // Rewrite the group key using the new time. + key := groupKeyForWindow(cr.Key(), startT, stopT) + if w.selector && values.IsNull(i) { + // Produce an empty table if the value is null + // and this is a selector. + table := execute.NewEmptyTable(key, cr.Cols()) + if err := f(table); err != nil { + return err + } + continue + } + + // Produce a slice for each column into a new + // table buffer. + buffer := arrow.TableBuffer{ + GroupKey: key, + Columns: cr.Cols(), + Values: make([]array.Interface, len(cr.Cols())), + } + for j, arr := range arrs { + buffer.Values[j] = arrow.Slice(arr, int64(i), int64(i+1)) + } + + // Wrap these into a single table and execute. + done := make(chan struct{}) + table := &windowTableRow{ + buffer: buffer, + done: done, + } + if err := f(table); err != nil { + return err + } + + select { + case <-done: + case <-w.ctx.Done(): + return w.ctx.Err() + } + } + return nil + }) +} + +func (w *windowTableSplitter) getTimeColumnIndex(label string) (int, error) { + j := execute.ColIdx(label, w.in.Cols()) + if j < 0 { + return -1, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("missing %q column from window splitter", label), + } + } else if c := w.in.Cols()[j]; c.Type != flux.TTime { + return -1, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: fmt.Sprintf("%q column must be of type time", label), + } + } + return j, nil +} + +type windowTableRow struct { + used int32 + buffer arrow.TableBuffer + done chan struct{} +} + +func (w *windowTableRow) Key() flux.GroupKey { + return w.buffer.GroupKey +} + +func (w *windowTableRow) Cols() []flux.ColMeta { + return w.buffer.Columns +} + +func (w *windowTableRow) Do(f func(flux.ColReader) error) error { + if !atomic.CompareAndSwapInt32(&w.used, 0, 1) { + return &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "table already read", + } + } + defer close(w.done) + + err := f(&w.buffer) + w.buffer.Release() + return err +} + +func (w *windowTableRow) Done() { + if atomic.CompareAndSwapInt32(&w.used, 0, 1) { + w.buffer.Release() + close(w.done) + } +} + +func (w *windowTableRow) Empty() bool { + return false +} + +func groupKeyForWindow(key flux.GroupKey, start, stop int64) flux.GroupKey { + cols := key.Cols() + vs := make([]values.Value, len(cols)) + for j, c := range cols { + if c.Label == execute.DefaultStartColLabel { + vs[j] = values.NewTime(values.Time(start)) + } else if c.Label == execute.DefaultStopColLabel { + vs[j] = values.NewTime(values.Time(stop)) + } else { + vs[j] = key.Value(j) + } + } + return execute.NewGroupKey(cols, vs) +} + +// getColumnValues returns the array from the column reader as an array.Interface. +func getColumnValues(cr flux.ColReader, j int) array.Interface { + switch typ := cr.Cols()[j].Type; typ { + case flux.TInt: + return cr.Ints(j) + case flux.TUInt: + return cr.UInts(j) + case flux.TFloat: + return cr.Floats(j) + case flux.TString: + return cr.Strings(j) + case flux.TBool: + return cr.Bools(j) + case flux.TTime: + return cr.Times(j) + default: + panic(fmt.Errorf("unimplemented column type: %s", typ)) + } +} diff --git a/storage/reads/store.go b/storage/reads/store.go index a078b7af5a5..526afccb76a 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -78,6 +78,8 @@ type GroupCursor interface { type Store interface { ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (GroupResultSet, error) + // WindowAggregate will invoke a ReadWindowAggregateRequest against the Store. + WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (ResultSet, error) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index e7ffd78d392..dce3bd4fd49 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -54,6 +54,41 @@ type Store struct { Logger *zap.Logger } +func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) { + if req.ReadSource == nil { + return nil, errors.New("missing read source") + } + + source, err := getReadSource(*req.ReadSource) + if err != nil { + return nil, err + } + + database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) + if err != nil { + return nil, err + } + + shardIDs, err := s.findShardIDs(database, rp, false, start, end) + if err != nil { + return nil, err + } + if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil + return nil, nil + } + + var cur reads.SeriesCursor + if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil { + return nil, err + } else if ic == nil { // TODO(jeff): this was a typed nil + return nil, nil + } else { + cur = ic + } + + return reads.NewWindowAggregateResultSet(ctx, req, cur) +} + func NewStore(store TSDBStore, metaClient MetaClient) *Store { return &Store{ TSDBStore: store,