Skip to content

Commit

Permalink
Merge pull request #8158 from influxdata/jw-max-series-limit
Browse files Browse the repository at this point in the history
Fix killing queries
  • Loading branch information
jwilder authored Mar 18, 2017
2 parents d3caef6 + 8177df2 commit d2b1c31
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
- [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions.
- [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values.
- [#7995](https://github.com/influxdata/influxdb/issues/7995): Update liner dependency to handle docker exec.
- [#7811](https://github.com/influxdata/influxdb/issues/7811): Kill query not killing query
- [#7457](https://github.com/influxdata/influxdb/issues/7457): KILL QUERY should work during all phases of a query

## v1.2.2 [2017-03-14]

Expand Down
87 changes: 40 additions & 47 deletions influxql/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (itr *floatInterruptIterator) Next() (*FloatPoint, error) {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
Expand Down Expand Up @@ -3424,7 +3424,7 @@ func (itr *integerInterruptIterator) Next() (*IntegerPoint, error) {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
Expand Down Expand Up @@ -6064,7 +6064,7 @@ func (itr *stringInterruptIterator) Next() (*StringPoint, error) {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
Expand Down Expand Up @@ -8704,7 +8704,7 @@ func (itr *booleanInterruptIterator) Next() (*BooleanPoint, error) {
if itr.count&0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
Expand Down
2 changes: 1 addition & 1 deletion influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) {
if itr.count & 0xFF == 0xFF {
select {
case <-itr.closing:
return nil, nil
return nil, itr.Close()
default:
// Reset iterator count to zero and fall through to emit the next point.
itr.count = 0
Expand Down
27 changes: 25 additions & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt
}

// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
tagSets, err := mm.TagSets(e.id, opt)
if err != nil {
return nil, err
}
Expand All @@ -1310,6 +1310,14 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt
itrs := make([]influxql.Iterator, 0, len(tagSets))
if err := func() error {
for _, t := range tagSets {
// Abort if the query was killed
select {
case <-opt.InterruptCh:
influxql.Iterators(itrs).Close()
return err
default:
}

inputs, err := e.createTagSetIterators(ref, mm, t, opt)
if err != nil {
return err
Expand Down Expand Up @@ -1352,7 +1360,7 @@ func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorO
}

// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
tagSets, err := mm.TagSets(e.id, opt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1509,6 +1517,21 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
continue
}
itrs = append(itrs, itr)

// Abort if the query was killed
select {
case <-opt.InterruptCh:
influxql.Iterators(itrs).Close()
return nil, err
default:
}

// Enforce series limit at creation time.
if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN {
influxql.Iterators(itrs).Close()
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", len(itrs), opt.MaxSeriesN)
}

}
return itrs, nil
}
Expand Down
28 changes: 24 additions & 4 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,11 +793,11 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) {
m.mu.RLock()

// get the unique set of series ids and the filters that should be applied to each
ids, filters, err := m.filters(condition)
ids, filters, err := m.filters(opt.Condition)
if err != nil {
m.mu.RUnlock()
return nil, err
Expand All @@ -807,15 +807,27 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
// purpose of GROUP BY they are part of the same composite series.
tagSets := make(map[string]*influxql.TagSet, 64)
var seriesN int
for _, id := range ids {
// Abort if the query was killed
select {
case <-opt.InterruptCh:
return nil, influxql.ErrQueryInterrupted
default:
}

if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN {
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}

s := m.seriesByID[id]
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(dimensions))
tags := make(map[string]string, len(opt.Dimensions))

// Build the TagSet for this series.
for _, dim := range dimensions {
for _, dim := range opt.Dimensions {
tags[dim] = s.GetTagString(dim)
}

Expand All @@ -832,6 +844,7 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf
}
// Associate the series and filter with the Tagset.
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
seriesN++

// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
Expand All @@ -841,6 +854,13 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf

// Sort the series in each tag set.
for _, t := range tagSets {
// Abort if the query was killed
select {
case <-opt.InterruptCh:
return nil, influxql.ErrQueryInterrupted
default:
}

sort.Sort(t)
}

Expand Down
7 changes: 7 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,13 @@ func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions)
}
itrs = append(itrs, itr)

select {
case <-opt.InterruptCh:
influxql.Iterators(itrs).Close()
return nil, err
default:
}

// Enforce series limit at creation time.
if opt.MaxSeriesN > 0 {
stats := itr.Stats()
Expand Down

0 comments on commit d2b1c31

Please sign in to comment.