Skip to content

Commit

Permalink
Merge pull request #461 from influxdb/fix-458-continuous-query-sequen…
Browse files Browse the repository at this point in the history
…ce-numbers

Fix #458. Correct sequence number calculation during continuous query interpolation.
  • Loading branch information
pauldix committed Apr 21, 2014
2 parents ecf5f28 + 4ad142f commit e671755
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,17 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string,
fieldIndex := series.GetFieldIndex(fieldName)
return point.GetFieldValueAsString(fieldIndex)
})
cleanedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues)

sanitizedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues)

if assignSequenceNumbers {
sequenceMap[sequenceKey{targetName, *point.Timestamp}] += 1
sequenceNumber := uint64(sequenceMap[sequenceKey{targetName, *point.Timestamp}])
key := sequenceKey{sanitizedTargetName, *point.Timestamp}
sequenceMap[key] += 1
sequenceNumber := uint64(sequenceMap[key])
point.SequenceNumber = &sequenceNumber
}

newSeries := &protocol.Series{Name: &cleanedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}}
newSeries := &protocol.Series{Name: &sanitizedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}}
if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil {
log.Error("Couldn't write data for continuous query: ", e)
}
Expand Down
72 changes: 72 additions & 0 deletions src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,13 @@ func (self ServerSuite) RemoveAllContinuousQueries(db string, c *C) {
}
}

func (self ServerSuite) AssertContinuousQueryCount(db string, count int, c *C) {
client := self.serverProcesses[0].GetClient(db, c)
queries, err := client.GetContinuousQueries()
c.Assert(err, IsNil)
c.Assert(queries, HasLen, count)
}

func (self *ServerSuite) TestContinuousQueryManagement(c *C) {
defer self.RemoveAllContinuousQueries("test_cq", c)

Expand Down Expand Up @@ -771,6 +778,71 @@ func (self *ServerSuite) TestContinuousQueryInterpolation(c *C) {
/* self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 4;", false, c) */
}

func (self *ServerSuite) TestContinuousQuerySequenceNumberAssignmentWithInterpolation(c *C) {
defer self.RemoveAllContinuousQueries("test_cq", c)

currentTime := time.Now()
t0 := currentTime.Truncate(10 * time.Second)
t1 := time.Unix(t0.Unix()-5, 0).Unix()
t2 := time.Unix(t0.Unix()-10, 0).Unix()

data := fmt.Sprintf(`[
{ "name": "points",
"columns": ["c1", "c2", "time"],
"points": [
[1, "a", %d],
[2, "a", %d],
[3, "a", %d],
[7, "b", %d],
[8, "b", %d],
[9, "b", %d]
]}
]`, t1, t1, t1, t2, t2, t2)

self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)

self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
self.AssertContinuousQueryCount("test_cq", 1, c)
self.serverProcesses[0].WaitForServerToSync()
time.Sleep(time.Second)

self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)

data = fmt.Sprintf(`[
{ "name": "points",
"columns": ["c1", "c2", "time"],
"points": [
[1, "aa", %d],
[2, "aa", %d],
[3, "aa", %d],
[7, "bb", %d],
[8, "bb", %d],
[9, "bb", %d]
]}
]`, t1, t1, t1, t2, t2, t2)

self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c)

self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c)
self.AssertContinuousQueryCount("test_cq", 1, c)
self.serverProcesses[0].WaitForServerToSync()
time.Sleep(time.Second)

collection := self.serverProcesses[0].Query("test_cq", "select * from points;", false, c)
series := collection.GetSeries("points", c)
c.Assert(series.Points, HasLen, 12)

collection = self.serverProcesses[0].Query("test_cq", "select * from /points.count.*/;", false, c)
c.Assert(collection.Members, HasLen, 4)

subseries := []string{"a", "aa", "b", "bb"}
for i := range subseries {
series = collection.GetSeries("points.count."+subseries[i], c)
c.Assert(series.Points, HasLen, 1)
c.Assert(series.Points[0][1], Equals, float64(1))
}
}

func (self *ServerSuite) TestGetServers(c *C) {
body := self.serverProcesses[0].Get("/cluster/servers?u=root&p=root", c)

Expand Down

0 comments on commit e671755

Please sign in to comment.