diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 49963b58044..8a8c31024b9 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -367,7 +367,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent name: "sum aggregation", query: `SELECT sum(value) FROM cpu WHERE time >= '2000-01-01 00:00:05' AND time <= '2000-01-01T00:00:10Z' GROUP BY time(10s), region`, queryDb: "%DB%", - expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`, + expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",null],["2000-01-01T00:00:10Z",30]]}]}]}`, }, { write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [ diff --git a/influxql/engine.go b/influxql/engine.go index f2d63da74aa..322786ac772 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -98,7 +98,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) { pointCountInResult = 1 } else { intervalTop := m.TMax/m.interval*m.interval + m.interval - pointCountInResult = int((intervalTop - m.TMin) / m.interval) + intervalBottom := m.TMin/m.interval*m.interval + pointCountInResult = int((intervalTop - intervalBottom) / m.interval) } if m.TMin == 0 && pointCountInResult > MaxGroupByPoints { diff --git a/tx.go b/tx.go index 0b1a8509b29..baca0dfe0cf 100644 --- a/tx.go +++ b/tx.go @@ -321,9 +321,13 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { return nil, nil } + intervalBottom := l.tmin + // Set the upper bound of the interval. if interval > 0 { - l.tmax = l.tmin + interval - 1 + // Make sure the bottom of the interval lands on a natural boundary. + intervalBottom = intervalBottom / interval * interval + l.tmax = intervalBottom + interval - 1 } // Execute the map function. This local mapper acts as the iterator @@ -339,7 +343,7 @@ func (l *LocalMapper) NextInterval(interval int64) (interface{}, error) { } // Move the interval forward. - l.tmin += interval + l.tmin = intervalBottom + interval return val, nil }