From 617ade62940d08803d27407683d2522671cd12a0 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 18 May 2016 20:01:41 -0400 Subject: [PATCH 1/3] Remove limit optimization when using an aggregate The limit optimization was put into the wrong place and caused only part of the shard to be read when a limit was used. The optimization is possible, but requires a bit of refactoring to the code here so the call iterator is created per series before handed to the limit iterator. Backport fix for #6661. --- CHANGELOG.md | 6 ++++++ tsdb/engine/tsm1/engine.go | 18 +++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 517c3dcde73..971e5ca1e5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v0.13.1 [unreleased] + +### Bugfixes + +- [#6661](https://github.com/influxdata/influxdb/issues/6661): Disable limit optimization when using an aggregate. + ## v0.13.0 [2016-05-12] ### Release Notes diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b3a38bbaa05..a3f95c859e8 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -743,7 +743,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator if call, ok := opt.Expr.(*influxql.Call); ok { refOpt := opt refOpt.Expr = call.Args[0].(*influxql.VarRef) - inputs, err := e.createVarRefIterator(refOpt) + inputs, err := e.createVarRefIterator(refOpt, true) if err != nil { return nil, err } @@ -758,7 +758,7 @@ func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator return nil, nil } - itrs, err := e.createVarRefIterator(opt) + itrs, err := e.createVarRefIterator(opt, false) if err != nil { return nil, err } @@ -827,7 +827,9 @@ func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, } // createVarRefIterator creates an iterator for a variable reference. -func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql.Iterator, error) { +// The aggregate argument determines this is being created for an aggregate. +// If this is an aggregate, the limit optimization is disabled temporarily. See #6661. +func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bool) ([]influxql.Iterator, error) { ref, _ := opt.Expr.(*influxql.VarRef) var itrs []influxql.Iterator @@ -868,14 +870,8 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions) ([]influxql. inputs = append(inputs, input) } - if len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) { - var itr influxql.Iterator - if opt.MergeSorted() { - itr = influxql.NewSortedMergeIterator(inputs, opt) - } else { - itr = influxql.NewMergeIterator(inputs, opt) - } - itrs = append(itrs, newLimitIterator(itr, opt)) + if !aggregate && len(inputs) > 0 && (opt.Limit > 0 || opt.Offset > 0) { + itrs = append(itrs, newLimitIterator(influxql.NewSortedMergeIterator(inputs, opt), opt)) } else { itrs = append(itrs, inputs...) } From a000f7684902b75d4b403128f17efcdfc301ddf5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 13 May 2016 17:04:12 -0600 Subject: [PATCH 2/3] Fix possible deadlock when queries and delete series run concurrently This locks showeed up in a deadlock systems running queries and delete series across a large dataset. Queries should not need to lock the tsdb.Store for writes --- CHANGELOG.md | 1 + tsdb/store.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 971e5ca1e5c..076639b190c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bugfixes - [#6661](https://github.com/influxdata/influxdb/issues/6661): Disable limit optimization when using an aggregate. +- [#6627](https://github.com/influxdata/influxdb/pull/6627): Fix possible deadlock when queries and delete series run concurrently. ## v0.13.0 [2016-05-12] diff --git a/tsdb/store.go b/tsdb/store.go index 30c42edc786..30fb3376e08 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -637,8 +637,8 @@ func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error // IteratorCreators returns a set of all local shards as iterator creators. func (s *Store) IteratorCreators() influxql.IteratorCreators { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() a := make(influxql.IteratorCreators, 0, len(s.shards)) for _, sh := range s.shards { From b7399bfc838cadbf87a40b60e50f1e94aeae64ea Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 17 May 2016 12:38:26 +0100 Subject: [PATCH 3/3] Fix concurrent map access panic --- CHANGELOG.md | 1 + tsdb/engine/tsm1/engine.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 076639b190c..512900c6006 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [#6661](https://github.com/influxdata/influxdb/issues/6661): Disable limit optimization when using an aggregate. - [#6627](https://github.com/influxdata/influxdb/pull/6627): Fix possible deadlock when queries and delete series run concurrently. +- [#6235](https://github.com/influxdata/influxdb/issues/6235): Fix measurement field panic in tsm1 engine. ## v0.13.0 [2016-05-12] diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a3f95c859e8..6ad0b5fe28c 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -972,7 +972,10 @@ func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measu // buildCursor creates an untyped cursor for a field. func (e *Engine) buildCursor(measurement, seriesKey, field string, opt influxql.IteratorOptions) cursor { // Look up fields for measurement. + e.mu.RLock() mf := e.measurementFields[measurement] + e.mu.RUnlock() + if mf == nil { return nil }