Skip to content

Commit

Permalink
Merge pull request #2598 from influxdb/select-tag-keys
Browse files Browse the repository at this point in the history
Implement tag support in SELECT statements
  • Loading branch information
toddboom committed May 20, 2015
2 parents 404bba2 + 9c8f42f commit 01fb935
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 15 deletions.
19 changes: 16 additions & 3 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,23 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
query: `SELECT * FROM "%DB%"."%RP%".cpu WHERE time < NOW()`,
expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`,
},

// Selecting tags
{
name: "selecting only a tag",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "gpu", "time": "2015-02-28T01:03:36.000Z", "tags": {"host": "server01"}, "fields": {"value": 100, "cores": 4}}, {"name": "gpu", "time": "2015-02-28T01:03:37.000Z", "tags": {"host": "server02"}, "fields": {"value": 50, "cores": 2}}]}`,
query: `SELECT host FROM "%DB%"."%RP%".gpu`,
expected: `{"results":[{"error":"select statement must include at least one field or function call"}]}`,
},
{
name: "selecting a tag and a field",
query: `SELECT host, value FROM "%DB%"."%RP%".gpu`,
expected: `{"results":[{"series":[{"name":"gpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["2015-02-28T01:03:36Z",100]]},{"name":"gpu","tags":{"host":"server02"},"columns":["time","value"],"values":[["2015-02-28T01:03:37Z",50]]}]}]}`,
},
{
name: "single point, select with now(), two queries",
query: `SELECT * FROM "%DB%"."%RP%".cpu WHERE time < now();SELECT * FROM "%DB%"."%RP%".cpu WHERE time < now()`,
expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]},{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`,
name: "selecting a tag and two fields",
query: `SELECT host, value, cores FROM "%DB%"."%RP%".gpu`,
expected: `{"results":[{"series":[{"name":"gpu","tags":{"host":"server01"},"columns":["time","value","cores"],"values":[["2015-02-28T01:03:36Z",100,4]]},{"name":"gpu","tags":{"host":"server02"},"columns":["time","value","cores"],"values":[["2015-02-28T01:03:37Z",50,2]]}]}]}`,
},

{
Expand Down
2 changes: 1 addition & 1 deletion database.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewMeasurement(name string) *Measurement {
}
}

// HasTagKey returns true if at least one eries in this measurement has written a value for the passed in tag key
// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (m *Measurement) HasTagKey(k string) bool {
return m.seriesByTagKeyValue[k] != nil
}
Expand Down
29 changes: 19 additions & 10 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// markers for which mappers have been completely emptied
mapperComplete := make([]bool, len(m.Mappers))

// for limit and offset we need to track how many values we've swalloed for the offset and how many we've already set for the limit.
// for limit and offset we need to track how many values we've swallowed for the offset and how many we've already set for the limit.
// we track the number set for the limit because they could be getting chunks. For instance if your limit is 10k, but chunk size is 1k
valuesSent := 0
valuesOffset := 0
Expand Down Expand Up @@ -697,12 +697,12 @@ func (m *MapReduceJob) processRawResults(values []*rawQueryMapOutput) *Row {
hasTime := false
for i, n := range selectNames {
if n == "time" {
// Swap time to the first argument for names
if i != 0 {
tmp := selectNames[0]
selectNames[0] = "time"
selectNames[i] = tmp
selectNames[0], selectNames[i] = selectNames[i], selectNames[0]
}
hasTime = true
break
}
}

Expand All @@ -711,23 +711,32 @@ func (m *MapReduceJob) processRawResults(values []*rawQueryMapOutput) *Row {
selectNames = append([]string{"time"}, selectNames...)
}

// if they've selected only a single value we have to handle things a little differently
singleValue := len(selectNames) == SelectColumnCountWithOneValue
// since selectNames can contain tags, we need to strip them out
selectFields := make([]string, 0, len(selectNames))

for _, n := range selectNames {
if _, found := m.TagSet.Tags[n]; !found {
selectFields = append(selectFields, n)
}
}

row := &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
Columns: selectNames,
Columns: selectFields,
}

// return an empty row if there are no results
if len(values) == 0 {
return row
}

// if they've selected only a single value we have to handle things a little differently
singleValue := len(selectFields) == SelectColumnCountWithOneValue

// the results will have all of the raw mapper results, convert into the row
for _, v := range values {
vals := make([]interface{}, len(selectNames))
vals := make([]interface{}, len(selectFields))

if singleValue {
vals[0] = time.Unix(0, v.Time).UTC()
Expand All @@ -739,8 +748,8 @@ func (m *MapReduceJob) processRawResults(values []*rawQueryMapOutput) *Row {
vals[0] = time.Unix(0, v.Time).UTC()

// populate the other values
for i := 1; i < len(selectNames); i++ {
vals[i] = fields[selectNames[i]]
for i := 1; i < len(selectFields); i++ {
vals[i] = fields[selectFields[i]]
}
}

Expand Down
6 changes: 5 additions & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
return nil, fmt.Errorf("unknown field or tag name in select clause: %s", n)
}
selectTags = append(selectTags, n)
tagKeys = append(tagKeys, n)
}
for _, n := range stmt.NamesInWhere() {
if n == "time" {
Expand All @@ -93,6 +94,10 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
}
}

if len(selectFields) == 0 && len(stmt.FunctionCalls()) == 0 {
return nil, fmt.Errorf("select statement must include at least one field or function call")
}

// Validate that group by is not a field
for _, d := range stmt.Dimensions {
switch e := d.Expr.(type) {
Expand Down Expand Up @@ -177,7 +182,6 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
return nil, err
}

//jobs := make([]*influxql.MapReduceJob, 0, len(tagSets))
for _, t := range tagSets {
// make a job for each tagset
job := &influxql.MapReduceJob{
Expand Down

0 comments on commit 01fb935

Please sign in to comment.