Skip to content

Commit

Permalink
Merge pull request #7388 from influxdata/js-cumulative-sum-over-time
Browse files Browse the repository at this point in the history
Implement cumulative_sum() function
  • Loading branch information
jsternberg authored Oct 7, 2016
2 parents 6c520c0 + 6afc2a7 commit e955d16
Show file tree
Hide file tree
Showing 8 changed files with 479 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [#7268](https://github.com/influxdata/influxdb/pull/7268): More man pages for the other tools we package and compress man pages fully.
- [#7305](https://github.com/influxdata/influxdb/pull/7305): UDP Client: Split large points. Thanks @vlasad
- [#7115](https://github.com/influxdata/influxdb/issues/7115): Feature request: `influx inspect -export` should dump WAL files.
- [#7388](https://github.com/influxdata/influxdb/pull/7388): Implement cumulative_sum() function.

### Bugfixes

Expand Down
220 changes: 220 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,226 @@ cpu value=35 1278010025000000000
}
}

// Ensure the server can handle various group by time cumulative sum queries.
func TestServer_Query_SelectGroupByTimeCumulativeSum(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=15 1278010021000000000
cpu value=20 1278010022000000000
cpu value=25 1278010023000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate cumulative sum of count",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",25],["2010-07-01T18:47:02Z",70]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// Ensure the server can handle various group by time cumulative sum queries with fill.
func TestServer_Query_SelectGroupByTimeCumulativeSumWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=20 1278010021000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate cumulative sum of count with fill 0",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",2]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of count with fill previous",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean with fill 0",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean with fill previous",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median with fill 0",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median with fill previous",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode with fill 0",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode with fill previous",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum with fill 0",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum with fill previous",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",60]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first with fill 0",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first with fill previous",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last with fill 0",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last with fill previous",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min with fill 0",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min with fill previous",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max with fill 0",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max with fill previous",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile with fill 0",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile with fill previous",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_MathWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
6 changes: 3 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
switch expr.Name {
case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed":
case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
Expand All @@ -1663,9 +1663,9 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return errors.New("elapsed requires a duration argument")
}
}
case "difference":
case "difference", "cumulative_sum":
if got := len(expr.Args); got != 1 {
return fmt.Errorf("invalid number of arguments for difference, expected 1, got %d", got)
return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", expr.Name, got)
}
case "moving_average":
if got := len(expr.Args); got != 2 {
Expand Down
20 changes: 20 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,26 @@ func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Itera
}
}

// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call.
func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatCumulativeSumReducer()
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerCumulativeSumReducer()
return fn, fn
}
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input)
}
}

// newHoltWintersIterator returns an iterator for operating on a elapsed() call.
func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) {
switch input := input.(type) {
Expand Down
52 changes: 52 additions & 0 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,58 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint {
}
}

// FloatCumulativeSumReducer cumulates the values from each point.
type FloatCumulativeSumReducer struct {
curr FloatPoint
}

// NewFloatCumulativeSumReducer creates a new FloatCumulativeSumReducer.
func NewFloatCumulativeSumReducer() *FloatCumulativeSumReducer {
return &FloatCumulativeSumReducer{
curr: FloatPoint{Nil: true},
}
}

func (r *FloatCumulativeSumReducer) AggregateFloat(p *FloatPoint) {
r.curr.Value += p.Value
r.curr.Time = p.Time
r.curr.Nil = false
}

func (r *FloatCumulativeSumReducer) Emit() []FloatPoint {
var pts []FloatPoint
if !r.curr.Nil {
pts = []FloatPoint{r.curr}
}
return pts
}

// IntegerCumulativeSumReducer cumulates the values from each point.
type IntegerCumulativeSumReducer struct {
curr IntegerPoint
}

// NewIntegerCumulativeSumReducer creates a new IntegerCumulativeSumReducer.
func NewIntegerCumulativeSumReducer() *IntegerCumulativeSumReducer {
return &IntegerCumulativeSumReducer{
curr: IntegerPoint{Nil: true},
}
}

func (r *IntegerCumulativeSumReducer) AggregateInteger(p *IntegerPoint) {
r.curr.Value += p.Value
r.curr.Time = p.Time
r.curr.Nil = false
}

func (r *IntegerCumulativeSumReducer) Emit() []IntegerPoint {
var pts []IntegerPoint
if !r.curr.Nil {
pts = []IntegerPoint{r.curr}
}
return pts
}

// FloatHoltWintersReducer forecasts a series into the future.
// This is done using the Holt-Winters damped method.
// 1. Using the series the initial values are calculated using a SSE.
Expand Down
Loading

0 comments on commit e955d16

Please sign in to comment.