Skip to content

Commit

Permalink
Implement cumulative_sum() function
Browse files Browse the repository at this point in the history
The `cumulative_sum()` function can be used to sum each new point and
output the current total. For the following points:

    cpu value=2 0
    cpu value=4 10
    cpu value=6 20

This would output the following points:

    > SELECT cumulative_sum(value) FROM cpu
    time    value
    ----    -----
    0       2
    10      6
    20      12

As can be seen, each new point adds to the sum of the previous point and
outputs the value with the same timestamp.

The function can also be used with an aggregate like `derivative()`.

    > SELECT cumulative_sum(mean(value) FROM cpu WHERE time >= now() - 10m GROUP BY time(1m)
  • Loading branch information
jsternberg committed Oct 3, 2016
1 parent 11cf759 commit 05ae3da
Show file tree
Hide file tree
Showing 7 changed files with 486 additions and 3 deletions.
220 changes: 220 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,226 @@ cpu value=35 1278010025000000000
}
}

// Ensure the server can handle various group by time cumulative sum queries.
func TestServer_Query_SelectGroupByTimeCumulativeSum(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=15 1278010021000000000
cpu value=20 1278010022000000000
cpu value=25 1278010023000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate cumulative sum of count",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",25],["2010-07-01T18:47:02Z",70]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// Ensure the server can handle various group by time cumulative sum queries with fill.
func TestServer_Query_SelectGroupByTimeCumulativeSumWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=20 1278010021000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate cumulative sum of count with fill 0",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",2]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of count with fill previous",
command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean with fill 0",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mean with fill previous",
command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median with fill 0",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of median with fill previous",
command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode with fill 0",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of mode with fill previous",
command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum with fill 0",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",30]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of sum with fill previous",
command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",60]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first with fill 0",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of first with fill previous",
command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last with fill 0",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of last with fill previous",
command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min with fill 0",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of min with fill previous",
command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max with fill 0",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of max with fill previous",
command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile with fill 0",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate cumulative sum of percentile with fill previous",
command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_MathWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
6 changes: 3 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
switch expr.Name {
case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed":
case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
Expand All @@ -1635,9 +1635,9 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
return errors.New("elapsed requires a duration argument")
}
}
case "difference":
case "difference", "cumulative_sum":
if got := len(expr.Args); got != 1 {
return fmt.Errorf("invalid number of arguments for difference, expected 1, got %d", got)
return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", expr.Name, got)
}
case "moving_average":
if got := len(expr.Args); got != 2 {
Expand Down
20 changes: 20 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,26 @@ func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Itera
}
}

// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call.
func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatCumulativeSumReducer()
return fn, fn
}
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerCumulativeSumReducer()
return fn, fn
}
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input)
}
}

// newHoltWintersIterator returns an iterator for operating on a elapsed() call.
func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) {
switch input := input.(type) {
Expand Down
64 changes: 64 additions & 0 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,70 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint {
}
}

// FloatCumulativeSumReducer cumulates the values from each point.
type FloatCumulativeSumReducer struct {
curr FloatPoint
emitted bool
}

// NewFloatCumulativeSumReducer creates a new FloatCumulativeSumReducer.
func NewFloatCumulativeSumReducer() *FloatCumulativeSumReducer {
return &FloatCumulativeSumReducer{
curr: FloatPoint{Nil: true},
}
}

func (r *FloatCumulativeSumReducer) AggregateFloat(p *FloatPoint) {
if !r.curr.Nil && r.curr.Time == p.Time {
return
}
r.curr.Value += p.Value
r.curr.Time = p.Time
r.curr.Nil = false
r.emitted = false
}

func (r *FloatCumulativeSumReducer) Emit() []FloatPoint {
var pts []FloatPoint
if !r.curr.Nil && !r.emitted {
pts = []FloatPoint{r.curr}
r.emitted = true
}
return pts
}

// IntegerCumulativeSumReducer cumulates the values from each point.
type IntegerCumulativeSumReducer struct {
curr IntegerPoint
emitted bool
}

// NewIntegerCumulativeSumReducer creates a new IntegerCumulativeSumReducer.
func NewIntegerCumulativeSumReducer() *IntegerCumulativeSumReducer {
return &IntegerCumulativeSumReducer{
curr: IntegerPoint{Nil: true},
}
}

func (r *IntegerCumulativeSumReducer) AggregateInteger(p *IntegerPoint) {
if !r.curr.Nil && r.curr.Time == p.Time {
return
}
r.curr.Value += p.Value
r.curr.Time = p.Time
r.curr.Nil = false
r.emitted = false
}

func (r *IntegerCumulativeSumReducer) Emit() []IntegerPoint {
var pts []IntegerPoint
if !r.curr.Nil && !r.emitted {
pts = []IntegerPoint{r.curr}
r.emitted = true
}
return pts
}

// FloatHoltWintersReducer forecasts a series into the future.
// This is done using the Holt-Winters damped method.
// 1. Using the series the initial values are calculated using a SSE.
Expand Down
Loading

0 comments on commit 05ae3da

Please sign in to comment.