diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 6bd2388b2fb..d2f10130660 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -538,15 +538,17 @@ func (self *CoordinatorImpl) InterpolateValuesAndCommit(query string, db string, fieldIndex := series.GetFieldIndex(fieldName) return point.GetFieldValueAsString(fieldIndex) }) - cleanedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues) + + sanitizedTargetName := strings.Map(replaceInvalidCharacters, targetNameWithValues) if assignSequenceNumbers { - sequenceMap[sequenceKey{targetName, *point.Timestamp}] += 1 - sequenceNumber := uint64(sequenceMap[sequenceKey{targetName, *point.Timestamp}]) + key := sequenceKey{sanitizedTargetName, *point.Timestamp} + sequenceMap[key] += 1 + sequenceNumber := uint64(sequenceMap[key]) point.SequenceNumber = &sequenceNumber } - newSeries := &protocol.Series{Name: &cleanedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}} + newSeries := &protocol.Series{Name: &sanitizedTargetName, Fields: series.Fields, Points: []*protocol.Point{point}} if e := self.CommitSeriesData(db, []*protocol.Series{newSeries}); e != nil { log.Error("Couldn't write data for continuous query: ", e) } diff --git a/src/integration/multiple_servers_test.go b/src/integration/multiple_servers_test.go index 2b1e45cfe50..f4dc610ef74 100644 --- a/src/integration/multiple_servers_test.go +++ b/src/integration/multiple_servers_test.go @@ -502,6 +502,13 @@ func (self ServerSuite) RemoveAllContinuousQueries(db string, c *C) { } } +func (self ServerSuite) AssertContinuousQueryCount(db string, count int, c *C) { + client := self.serverProcesses[0].GetClient(db, c) + queries, err := client.GetContinuousQueries() + c.Assert(err, IsNil) + c.Assert(queries, HasLen, count) +} + func (self *ServerSuite) TestContinuousQueryManagement(c *C) { defer self.RemoveAllContinuousQueries("test_cq", c) @@ -771,6 +778,71 @@ func (self *ServerSuite) TestContinuousQueryInterpolation(c *C) { /* self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 4;", false, c) */ } +func (self *ServerSuite) TestContinuousQuerySequenceNumberAssignmentWithInterpolation(c *C) { + defer self.RemoveAllContinuousQueries("test_cq", c) + + currentTime := time.Now() + t0 := currentTime.Truncate(10 * time.Second) + t1 := time.Unix(t0.Unix()-5, 0).Unix() + t2 := time.Unix(t0.Unix()-10, 0).Unix() + + data := fmt.Sprintf(`[ + { "name": "points", + "columns": ["c1", "c2", "time"], + "points": [ + [1, "a", %d], + [2, "a", %d], + [3, "a", %d], + [7, "b", %d], + [8, "b", %d], + [9, "b", %d] + ]} + ]`, t1, t1, t1, t2, t2, t2) + + self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c) + + self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c) + self.AssertContinuousQueryCount("test_cq", 1, c) + self.serverProcesses[0].WaitForServerToSync() + time.Sleep(time.Second) + + self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c) + + data = fmt.Sprintf(`[ + { "name": "points", + "columns": ["c1", "c2", "time"], + "points": [ + [1, "aa", %d], + [2, "aa", %d], + [3, "aa", %d], + [7, "bb", %d], + [8, "bb", %d], + [9, "bb", %d] + ]} + ]`, t1, t1, t1, t2, t2, t2) + + self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass", data, c) + + self.serverProcesses[0].QueryAsRoot("test_cq", "select count(c1) from points group by time(5s), c2 into :series_name.count.[c2];", false, c) + self.AssertContinuousQueryCount("test_cq", 1, c) + self.serverProcesses[0].WaitForServerToSync() + time.Sleep(time.Second) + + collection := self.serverProcesses[0].Query("test_cq", "select * from points;", false, c) + series := collection.GetSeries("points", c) + c.Assert(series.Points, HasLen, 12) + + collection = self.serverProcesses[0].Query("test_cq", "select * from /points.count.*/;", false, c) + c.Assert(collection.Members, HasLen, 4) + + subseries := []string{"a", "aa", "b", "bb"} + for i := range subseries { + series = collection.GetSeries("points.count."+subseries[i], c) + c.Assert(series.Points, HasLen, 1) + c.Assert(series.Points[0][1], Equals, float64(1)) + } +} + func (self *ServerSuite) TestGetServers(c *C) { body := self.serverProcesses[0].Get("/cluster/servers?u=root&p=root", c)