-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): enable window agg mean pushdown #19807
Conversation
2273591
to
f34cee7
Compare
storage/flux/reader.go
Outdated
@@ -54,6 +57,31 @@ type storeReader struct { | |||
s storage.Store | |||
} | |||
|
|||
type qWAC struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just added to return appropriate capabilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this duplicated capability struct? It is exactly the same as readsWAC
.
To take one step back, I think we do not need to have the capability system in OSS at all since it does not have separate queryd and storaged services and operate everything in single node.
} | ||
} | ||
|
||
req.Aggregate = make([]*datatypes.Aggregate, len(wai.spec.Aggregates)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datatypes
was substituted for anywhere that storageproto
was previously used.
v1/services/storage/store.go
Outdated
@@ -54,6 +54,56 @@ type Store struct { | |||
Logger *zap.Logger | |||
} | |||
|
|||
type readsWAC struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added to return enabled capabilities
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
shardIDs, err := s.findShardIDs(database, rp, false, start, end) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil | ||
return nil, nil | ||
} | ||
|
||
var cur reads.SeriesCursor | ||
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil { | ||
return nil, err | ||
} else if ic == nil { // TODO(jeff): this was a typed nil | ||
return nil, nil | ||
} else { | ||
cur = ic | ||
} | ||
|
||
return reads.NewWindowAggregateResultSet(ctx, req, cur) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to follow OSS convention for similar pushdown types
testcases := []struct { | ||
name string | ||
data []string | ||
query string | ||
op string | ||
want string | ||
skip string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unskipped mean tests and left all other skipped for now. There are not tests for monthly durations yet as that improvement has not yet made it to OSS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. Were you planning on adding the pushdown rule too or is that coming in a different PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome!
@@ -96,6 +96,7 @@ type ReadWindowAggregateSpec struct { | |||
Aggregates []plan.ProcedureKind | |||
CreateEmpty bool | |||
TimeColumn string | |||
Window execute.Window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? Does it duplicate WindowEvery
and Offset
in this struct? I don't see the value of this gets assigned anywhere in the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah that's a remnant from the duration support update. We will need it in the future for when we add monthly durations, but with some revision I can remove it for now and we can add it back when we make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for the record, it was used in the added code. it was used to create durations like here:
influxdb/storage/flux/reader.go
Lines 278 to 300 in f34cee7
if wai.spec.WindowEvery != 0 || wai.spec.Offset != 0 { | |
req.Window = &datatypes.Window{ | |
Every: &datatypes.Duration{ | |
Nsecs: wai.spec.WindowEvery, | |
}, | |
Offset: &datatypes.Duration{ | |
Nsecs: wai.spec.Offset, | |
}, | |
} | |
} else { | |
req.Window = &datatypes.Window{ | |
Every: &datatypes.Duration{ | |
Nsecs: wai.spec.Window.Every.Nanoseconds(), | |
Months: wai.spec.Window.Every.Months(), | |
Negative: wai.spec.Window.Every.IsNegative(), | |
}, | |
Offset: &datatypes.Duration{ | |
Nsecs: wai.spec.Window.Offset.Nanoseconds(), | |
Months: wai.spec.Window.Offset.Months(), | |
Negative: wai.spec.Window.Offset.IsNegative(), | |
}, | |
} | |
} |
But I'll be removing that since the duration support update was not an intentional part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine if we keep this since it's just going to be added anyway when we enable the rest of the pushdowns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't actually have a current issue for adding duration support right now I don't think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are you saying that as we port over all the currently planned pushdowns, we'll get this revision for free as long as we don't remove these bits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with leaving it then. @yzhang1991 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jlapacik in that case, I think that leaving duration support in place and enabling those tests should be added to the DOD for finalizing the other pushdowns.
storage/flux/reader.go
Outdated
@@ -54,6 +57,31 @@ type storeReader struct { | |||
s storage.Store | |||
} | |||
|
|||
type qWAC struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this duplicated capability struct? It is exactly the same as readsWAC
.
To take one step back, I think we do not need to have the capability system in OSS at all since it does not have separate queryd and storaged services and operate everything in single node.
@yzhang1991 yeah, the team discussed we decided to backtrack on leaving the capabilities in for now. We also decided that it makes sense to consolidate the Store interface with the WindowAggregateStore interface. I'll go ahead and make that change. |
dc186b6
to
fa6d59c
Compare
@@ -680,69 +680,19 @@ func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern { | |||
} | |||
|
|||
func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're only pushing down mean as a part of this PR. Cases can be added back for each agg as it is ported.
fa6d59c
to
c735e54
Compare
c735e54
to
5e1cdda
Compare
func fluxTime(t int64) flux.Time { | ||
return flux.Time{ | ||
Absolute: time.Unix(0, t).UTC(), | ||
} | ||
} | ||
|
||
var skipTests = map[string]string{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jlapacik I had to skip these tests once I changed the planner rules. You can remove them from the map as they are enabled.
Final issue to port window agg mean and bare agg mean pushdown to OSS.
As discussed below, this PR will also remove capabilities and consolidate the
Store
andStoreReader
interfaces as well as any mock interfaces for testing.