Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up Top aggregate function #3930

Merged
merged 37 commits into from
Sep 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
72fd115
exposing tags on cursors, top/bottom are valid funcs now
corylanou Aug 27, 2015
97f2dc8
comment/type fixes
corylanou Aug 28, 2015
193fd50
first pass at MapTop. more testing to come
corylanou Aug 28, 2015
dd278a1
add test for mixed numerics and fix infer
corylanou Aug 28, 2015
52cb46f
mixed maptop test with strings
corylanou Aug 31, 2015
f8d486f
maptop bool test
corylanou Aug 31, 2015
e6de6d0
change percentile to check errors in parsing, not in mapping
corylanou Aug 31, 2015
6b005af
change ReducePercentile signature
corylanou Aug 31, 2015
b45872c
first pass at ReduceTop
corylanou Aug 31, 2015
c5358dc
always sort results for ReduceTop. Skip test until we verify expecte…
corylanou Aug 31, 2015
d060f3a
move all aggregate validations to the parser validation from map/redu…
corylanou Aug 31, 2015
0462822
wip remapping top output
corylanou Aug 31, 2015
35b9215
refactor processTopBottom - wip
corylanou Sep 1, 2015
5a66725
refactoring/adding top integration testing
corylanou Sep 1, 2015
6f7eca9
correcting sort behavior for top
corylanou Sep 1, 2015
0fa9cfe
ignore SourceGraph directory
corylanou Sep 1, 2015
f3e557d
more top test scenarios
corylanou Sep 1, 2015
ba79007
wip
corylanou Sep 2, 2015
8c4595b
top is coming together. filling out fields properly
corylanou Sep 2, 2015
347ffc7
wire up advanced top sorting/slicing
corylanou Sep 3, 2015
3db5a85
all tests passing for top
corylanou Sep 3, 2015
9703467
refactor validateAggregates
corylanou Sep 3, 2015
d3465ba
remove sourcegraph files added in a rebase
corylanou Sep 4, 2015
3867fed
one additional test for top
corylanou Sep 4, 2015
88ce04a
move comment to proper line
corylanou Sep 4, 2015
b71833a
refactor allowMixedAggregates, comment cleanup
corylanou Sep 4, 2015
3ca9359
BucketTime -> TMin
corylanou Sep 4, 2015
4a9e936
minor comment fix
corylanou Sep 4, 2015
9ab3d89
bucketTime* -> tMin*
corylanou Sep 4, 2015
6a3bedc
add additional parser test for top function with tags
corylanou Sep 4, 2015
08295c5
refactor processTopBottom
corylanou Sep 4, 2015
65e6528
btf -> tmin
corylanou Sep 4, 2015
b62d8c0
expand variable names for clarity
corylanou Sep 4, 2015
be92093
update changelog
corylanou Sep 4, 2015
bdc54cf
helpers -> pkg + readme
corylanou Sep 4, 2015
fa4415b
refactor processing top/bottom results. clarify some comments
corylanou Sep 4, 2015
a09e2c2
minor tweaks based on PR review
corylanou Sep 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ integration/migration_data/

# goconvey config files
*.goconvey

// Ingnore SourceGraph directory
.srclib-store/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc
- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821)

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func configureLogging(s *Server) {
s.MetaStore.Logger = nullLogger
s.TSDBStore.Logger = nullLogger
s.HintedHandoff.SetLogger(nullLogger)
s.Monitor.SetLogger(nullLogger)
for _, service := range s.Services {
if service, ok := service.(logSetter); ok {
service.SetLogger(nullLogger)
Expand Down
189 changes: 188 additions & 1 deletion cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func TestServer_Query_Count(t *testing.T) {
&Query{
name: "selecting count(*) should error",
command: `SELECT count(*) FROM db0.rp0.cpu`,
exp: `{"results":[{"error":"expected field argument in count()"}]}`,
exp: `{"error":"error parsing query: expected field argument in count()"}`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is now caught upstream further, so it isn't being wrapped by an error return again, or in this specific case, a results object.

},
}...)

Expand Down Expand Up @@ -2229,6 +2229,193 @@ func TestServer_Query_Aggregates(t *testing.T) {
}
}

func TestServer_Query_AggregatesTopInt(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
// cpu data with overlapping duplicate values
// hour 0
fmt.Sprintf(`cpu,host=server01 value=2.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 value=3.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server03 value=4.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
// hour 1
fmt.Sprintf(`cpu,host=server04 value=5.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server05 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server06 value=6.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:20Z").UnixNano()),
// hour 2
fmt.Sprintf(`cpu,host=server07 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server08 value=9.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:10Z").UnixNano()),

// memory data
// hour 0
fmt.Sprintf(`memory,host=a,service=redis value=1000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1500i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
// hour 1
fmt.Sprintf(`memory,host=a,service=redis value=1001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1501i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()),
// hour 2
fmt.Sprintf(`memory,host=a,service=redis value=1002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=mysql value=2002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
fmt.Sprintf(`memory,host=b,service=redis value=1502i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "top - cpu",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 2 values",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values - sorts on tie properly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - with tag",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2) FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top","host"],"values":[["2000-01-01T01:00:10Z",7,"server05"],["2000-01-01T02:00:10Z",9,"server08"]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu limit 2`,
exp: `{"error":"error parsing query: limit (3) in top function can not be larger than the LIMIT (2) in the select statement"}`,
},
&Query{
name: "top - cpu - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T01:00:00Z",7],["2000-01-01T02:00:00Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT time, TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - time specified (not first) - hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 1), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 2 values hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - 2 values hourly",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - cpu - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:00Z",2],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T01:00:00Z",5],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`,
},
&Query{
name: "top - cpu - time specified - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 3), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:00Z",5],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`,
},
&Query{
name: "top - memory - 2 values, two tags",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, 2), host, service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T01:00:00Z",2001,"b","mysql"],["2000-01-01T02:00:00Z",2002,"b","mysql"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host"],"values":[["2000-01-01T02:00:00Z",2002,"b"],["2000-01-01T02:00:00Z",1002,"a"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2, service tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2), service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},
&Query{
name: "top - memory - service tag with limit 2, host tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, service, 2), host FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","service","host"],"values":[["2000-01-01T02:00:00Z",2002,"mysql","b"],["2000-01-01T02:00:00Z",1502,"redis","b"]]}]}]}`,
},
&Query{
name: "top - memory - host and service tag with limit 2",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, service, 2) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"]]}]}]}`,
},
&Query{
name: "top - memory - host tag with limit 2 with service tag in select",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, 2), service FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},
&Query{
name: "top - memory - host and service tag with limit 3",
params: url.Values{"db": []string{"db0"}},
command: `SELECT TOP(value, host, service, 3) FROM memory`,
exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`,
},

// TODO
// - Test that specifiying fields or tags in the function will rewrite the query to expand them to the fields
// - Test that a field can be used in the top function
// - Test that asking for a field will come back before a tag if they have the same name for a tag and a field
// - Test that `select top(value, host, 2)` when there is only one value for `host` it will only bring back one value
// - Test that `select top(value, host, 4) from foo where time > now() - 1d and time < now() group by time(1h)` and host is unique in some time buckets that it returns only the unique ones, and not always 4 values

}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// Test various aggregates when different series only have data for the same timestamp.
func TestServer_Query_AggregatesIdenticalTime(t *testing.T) {
t.Parallel()
Expand Down
Loading