From bc4aeefbedadaf4ff4e10710c76d7fc6bfc0eced Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 16 Mar 2017 12:45:40 -0600 Subject: [PATCH 1/5] Check max-series-limit in shard iterator creation The limit waited until all the iterators had been created which still allows problem queries to be planned. This allows the queries to be aborted much earlier in some cases. --- tsdb/engine/tsm1/engine.go | 11 +++++++++-- tsdb/meta.go | 8 +++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 67eb5448acd..7d41152d4da 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1292,7 +1292,7 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition) + tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition, opt) if err != nil { return nil, err } @@ -1352,7 +1352,7 @@ func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorO } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition) + tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition, opt) if err != nil { return nil, err } @@ -1509,6 +1509,13 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu continue } itrs = append(itrs, itr) + + // Enforce series limit at creation time. + if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN { + influxql.Iterators(itrs).Close() + return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", len(itrs), opt.MaxSeriesN) + } + } return itrs, nil } diff --git a/tsdb/meta.go b/tsdb/meta.go index 7d3922e28c9..46567ee4f97 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -793,7 +793,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf // This will also populate the TagSet objects with the series IDs that match each tagset and any // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { +func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) { m.mu.RLock() // get the unique set of series ids and the filters that should be applied to each @@ -807,7 +807,12 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf // TagSet for that series. Series with the same TagSet are then grouped together, because for the // purpose of GROUP BY they are part of the same composite series. tagSets := make(map[string]*influxql.TagSet, 64) + var seriesN int for _, id := range ids { + if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN { + return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) + } + s := m.seriesByID[id] if !s.Assigned(shardID) { continue @@ -832,6 +837,7 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf } // Associate the series and filter with the Tagset. tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) + seriesN++ // Ensure it's back in the map. tagSets[string(tagsAsKey)] = tagSet From 86ad0a45b6cc186d102189ae85cba9926c5399fd Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Mar 2017 15:56:23 -0600 Subject: [PATCH 2/5] Ensure iterators are closed when query is killed The underlying iterators were not closed when a query was kill so although the client would receive an error, the query would continue on until completion. --- influxql/internal/internal.pb.go | 87 +++++++++++++++----------------- influxql/iterator.gen.go | 8 +-- influxql/iterator.gen.go.tmpl | 2 +- 3 files changed, 45 insertions(+), 52 deletions(-) diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index 700a4706509..5c9d474dd61 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -505,51 +505,44 @@ func init() { func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) } var fileDescriptorInternal = []byte{ - // 726 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0x51, 0x6b, 0xe3, 0x46, - 0x10, 0x46, 0x92, 0xe5, 0x58, 0xeb, 0xb8, 0x49, 0xb7, 0x69, 0xb2, 0x94, 0xd2, 0x0a, 0x3d, 0x09, - 0x4a, 0x1d, 0xc8, 0x6b, 0xa1, 0xe0, 0x34, 0x49, 0x31, 0x5c, 0x9c, 0xb0, 0x0a, 0x79, 0xdf, 0x8b, - 0xc6, 0x62, 0x41, 0x5e, 0xf9, 0x56, 0xab, 0xc3, 0xf9, 0x29, 0xf7, 0x1b, 0xee, 0xc7, 0xdc, 0xcb, - 0xfd, 0xa0, 0x63, 0x67, 0x25, 0x4b, 0xc9, 0xc1, 0xe5, 0x49, 0xf3, 0x7d, 0x33, 0xbb, 0xab, 0x99, - 0xf9, 0x66, 0xc8, 0x99, 0x54, 0x06, 0xb4, 0x12, 0xe5, 0x79, 0x67, 0xcc, 0xb7, 0xba, 0x32, 0x15, - 0x9d, 0x48, 0xb5, 0x2e, 0x9b, 0xdd, 0x87, 0x32, 0xf9, 0xe2, 0x93, 0xf0, 0xbe, 0x92, 0xca, 0x50, - 0x4a, 0x46, 0x2b, 0xb1, 0x01, 0xe6, 0xc5, 0x7e, 0x1a, 0x71, 0xb4, 0x2d, 0xf7, 0x20, 0x8a, 0x9a, - 0xf9, 0x8e, 0xb3, 0x36, 0x72, 0x72, 0x03, 0x2c, 0x88, 0xfd, 0x34, 0xe0, 0x68, 0xd3, 0x63, 0x12, - 0xac, 0x64, 0xc9, 0x46, 0xb1, 0x9f, 0x4e, 0xb8, 0x35, 0xe9, 0x9f, 0x24, 0x58, 0x34, 0x3b, 0x16, - 0xc6, 0x41, 0x3a, 0xbd, 0x98, 0xcd, 0xbb, 0xf7, 0xe6, 0x8b, 0x66, 0xc7, 0xad, 0x87, 0xfe, 0x41, - 0xc8, 0xa2, 0x28, 0x34, 0x14, 0xc2, 0x40, 0xce, 0xc6, 0xb1, 0x97, 0xce, 0xf8, 0x80, 0xb1, 0xfe, - 0x9b, 0xb2, 0x12, 0xe6, 0x51, 0x94, 0x0d, 0xb0, 0x83, 0xd8, 0x4b, 0x3d, 0x3e, 0x60, 0x68, 0x42, - 0x0e, 0x97, 0xca, 0x40, 0x01, 0xda, 0x45, 0x4c, 0x62, 0x2f, 0x0d, 0xf8, 0x0b, 0x8e, 0xc6, 0x64, - 0x9a, 0x19, 0x2d, 0x55, 0xe1, 0x42, 0xa2, 0xd8, 0x4b, 0x23, 0x3e, 0xa4, 0xec, 0x2d, 0x97, 0x55, - 0x55, 0x82, 0x50, 0x2e, 0x84, 0xc4, 0x5e, 0x3a, 0xe1, 0x2f, 0x38, 0xfa, 0x37, 0x09, 0x33, 0x23, - 0x4c, 0xcd, 0xa6, 0xb1, 0x97, 0x4e, 0x2f, 0xce, 0xfa, 0x64, 0x96, 0x06, 0xb4, 0x30, 0x95, 0x46, - 0x37, 0x77, 0x51, 0xc9, 0x67, 0x0f, 0x53, 0xa7, 0xbf, 0x91, 0xc9, 0x95, 0x30, 0xe2, 0xe1, 0x79, - 0xeb, 0x6a, 0x1a, 0xf2, 0x3d, 0x7e, 0x95, 0x9c, 0xff, 0x66, 0x72, 0xc1, 0xdb, 0xc9, 0x8d, 0xde, - 0x4e, 0x2e, 0xfc, 0x3e, 0xb9, 0xe4, 0xeb, 0x88, 0x1c, 0x75, 0x69, 0xdc, 0x6d, 0x8d, 0xac, 0x14, - 0x76, 0xf8, 0x7a, 0xb7, 0xd5, 0xcc, 0xc3, 0x2b, 0xd1, 0xb6, 0x1d, 0xb6, 0xfd, 0xf4, 0xe3, 0x20, - 0x8d, 0x5c, 0x03, 0x53, 0x32, 0xbe, 0x91, 0x50, 0xe6, 0x35, 0xfb, 0x19, 0x9b, 0x7c, 0xdc, 0xd7, - 0xe5, 0x51, 0x68, 0x0e, 0x6b, 0xde, 0xfa, 0xe9, 0x39, 0x39, 0xc8, 0xaa, 0x46, 0x3f, 0x41, 0xcd, - 0x02, 0x0c, 0xfd, 0xb5, 0x0f, 0xbd, 0x05, 0x51, 0x37, 0x1a, 0x36, 0xa0, 0x0c, 0xef, 0xa2, 0xe8, - 0x9c, 0x4c, 0x6c, 0xaa, 0xfa, 0xa3, 0x28, 0x31, 0xaf, 0xe9, 0x05, 0x1d, 0x14, 0xbd, 0xf5, 0xf0, - 0x7d, 0x8c, 0x2d, 0xe7, 0x95, 0xdc, 0x80, 0xaa, 0xed, 0xef, 0xa3, 0xe6, 0x22, 0x3e, 0x60, 0x28, - 0x23, 0x07, 0xff, 0xeb, 0xaa, 0xd9, 0x5e, 0x3e, 0xb3, 0x5f, 0xd0, 0xd9, 0x41, 0x9b, 0xea, 0x8d, - 0x2c, 0x4b, 0xd4, 0x5f, 0xc8, 0xd1, 0xa6, 0xbf, 0x93, 0xc8, 0x7e, 0x87, 0xc2, 0xeb, 0x09, 0xeb, - 0xfd, 0xaf, 0x52, 0xb9, 0xb4, 0xa5, 0x42, 0xd1, 0x45, 0xbc, 0x27, 0xac, 0x37, 0x33, 0x42, 0x1b, - 0x9c, 0x90, 0x08, 0xbb, 0xd6, 0x13, 0xf6, 0x3f, 0xae, 0x55, 0x8e, 0x3e, 0x82, 0xbe, 0x0e, 0xda, - 0x73, 0x8b, 0xfa, 0x09, 0x54, 0x2e, 0x55, 0x81, 0x3a, 0x9b, 0xf0, 0x9e, 0xa0, 0x27, 0x24, 0x7c, - 0x27, 0x37, 0xd2, 0xb0, 0x43, 0x3c, 0xe5, 0x00, 0x3d, 0x25, 0xe3, 0xbb, 0xf5, 0xba, 0x06, 0xc3, - 0x66, 0x48, 0xb7, 0xc8, 0xf2, 0x99, 0x0b, 0xff, 0xc9, 0xf1, 0x0e, 0xd9, 0xd7, 0xb3, 0xf6, 0xc0, - 0x91, 0x7b, 0x3d, 0xeb, 0x4f, 0x5c, 0x41, 0xde, 0x6c, 0x81, 0x1d, 0xe3, 0xd3, 0x2d, 0xb2, 0x75, - 0xbd, 0x15, 0xbb, 0x0c, 0xb4, 0x84, 0x7a, 0xc5, 0x28, 0x1e, 0x1a, 0x30, 0xf6, 0xc6, 0x3b, 0x9d, - 0x83, 0x86, 0x9c, 0x9d, 0xe0, 0xc1, 0x0e, 0x26, 0xff, 0x90, 0xc3, 0x41, 0x67, 0x6b, 0xfa, 0x17, - 0x09, 0x97, 0x06, 0x36, 0x35, 0xf3, 0x7e, 0x24, 0x00, 0x17, 0x93, 0x7c, 0xf2, 0xc8, 0x74, 0x40, - 0x77, 0x93, 0xf4, 0x5e, 0xd4, 0xd0, 0x6a, 0x72, 0x8f, 0x69, 0x4a, 0x8e, 0x38, 0x18, 0x50, 0xb6, - 0xfa, 0xf7, 0x55, 0x29, 0x9f, 0x9e, 0x71, 0x9c, 0x22, 0xfe, 0x9a, 0xde, 0xef, 0xb7, 0xc0, 0xa9, - 0x1a, 0xf7, 0xdb, 0x09, 0x09, 0x39, 0x14, 0xb0, 0x6b, 0xa7, 0xc7, 0x01, 0xfb, 0xde, 0xb2, 0x7e, - 0x10, 0xba, 0x00, 0xd3, 0xce, 0xcc, 0x1e, 0x27, 0xff, 0xf6, 0xd2, 0xc4, 0xff, 0x6a, 0xb4, 0x40, - 0x25, 0x78, 0x58, 0x9c, 0x3d, 0x1e, 0x34, 0xc7, 0x1f, 0x36, 0x27, 0x59, 0x90, 0xd9, 0x8b, 0xad, - 0x81, 0x5d, 0x69, 0x0b, 0xec, 0xb5, 0x5d, 0x69, 0xab, 0x7b, 0x4a, 0xc6, 0xb8, 0x99, 0x57, 0xdd, - 0x15, 0x0e, 0x25, 0x73, 0x32, 0x76, 0x03, 0x66, 0x87, 0xf2, 0x51, 0x94, 0xed, 0xc6, 0xb6, 0x26, - 0x2e, 0x67, 0xbb, 0x70, 0x7c, 0xa7, 0x67, 0x6b, 0x7f, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xd1, 0x2a, - 0x83, 0x14, 0x06, 0x06, 0x00, 0x00, + // 609 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0xe1, 0x4e, 0xdb, 0x30, + 0x10, 0x56, 0xea, 0xa6, 0x4d, 0x2e, 0x2d, 0x6d, 0x0d, 0x13, 0xd6, 0x7e, 0x45, 0x1d, 0x42, 0xf9, + 0x31, 0xb1, 0x09, 0xed, 0x05, 0xca, 0x80, 0xa9, 0xd2, 0x28, 0x88, 0x20, 0xfe, 0x7b, 0xe4, 0x1a, + 0x59, 0x72, 0x9d, 0xce, 0x76, 0xa6, 0xf2, 0x38, 0x7b, 0xbe, 0xbd, 0xc4, 0x64, 0x37, 0xa1, 0x1d, + 0x42, 0xfb, 0x97, 0xfb, 0x7c, 0x77, 0xf9, 0xee, 0xfb, 0xee, 0xe0, 0x58, 0x28, 0x8b, 0x5a, 0x71, + 0xf9, 0xa9, 0xfd, 0x38, 0x5b, 0xeb, 0xca, 0x56, 0x34, 0x12, 0x6a, 0x29, 0xeb, 0xcd, 0x4f, 0x39, + 0xfd, 0x13, 0x40, 0x78, 0x57, 0x09, 0x65, 0xe9, 0x00, 0xba, 0x0b, 0xbe, 0x42, 0x16, 0xa4, 0x9d, + 0x2c, 0x76, 0xd1, 0x03, 0x2f, 0x0d, 0xeb, 0xbc, 0x44, 0x62, 0x85, 0x8c, 0xa4, 0x9d, 0x8c, 0xd0, + 0x04, 0xc8, 0x42, 0x48, 0xd6, 0x4d, 0x3b, 0x59, 0x44, 0xdf, 0x03, 0x99, 0xd5, 0x1b, 0x16, 0xa6, + 0x24, 0x4b, 0xce, 0x87, 0x67, 0x6d, 0xe3, 0xb3, 0x59, 0xbd, 0xa1, 0x14, 0x60, 0x56, 0x96, 0x1a, + 0x4b, 0x6e, 0xb1, 0x60, 0xbd, 0x34, 0xc8, 0x86, 0x0e, 0xbb, 0x96, 0x15, 0xb7, 0x8f, 0x5c, 0xd6, + 0xc8, 0xfa, 0x69, 0x90, 0x05, 0xf4, 0x08, 0x06, 0x73, 0x65, 0xb1, 0x44, 0xbd, 0x45, 0xa3, 0x34, + 0xc8, 0x08, 0x3d, 0x84, 0x24, 0xb7, 0x5a, 0xa8, 0x72, 0x0b, 0xc6, 0x69, 0x90, 0xc5, 0x2e, 0xf5, + 0xa2, 0xaa, 0x24, 0x72, 0xb5, 0x45, 0x21, 0x0d, 0xb2, 0x88, 0x9e, 0x42, 0x98, 0x5b, 0x6e, 0x0d, + 0x4b, 0xd2, 0x20, 0x4b, 0xce, 0x8f, 0x77, 0x34, 0xe6, 0x16, 0x35, 0xb7, 0x95, 0xf6, 0xcf, 0x53, + 0xe9, 0xc9, 0xd2, 0x31, 0x44, 0x97, 0xdc, 0xf2, 0x87, 0xe7, 0xf5, 0x76, 0xdc, 0xf0, 0x15, 0xab, + 0xce, 0x9b, 0xac, 0xc8, 0x5b, 0xac, 0xba, 0x6f, 0xb2, 0x0a, 0x1d, 0xab, 0xe9, 0x6f, 0x02, 0xa3, + 0xf6, 0xff, 0xb7, 0x6b, 0x2b, 0x2a, 0x65, 0x9c, 0x92, 0x57, 0x9b, 0xb5, 0x66, 0x81, 0xaf, 0x4b, + 0xb6, 0xe2, 0x75, 0x52, 0x92, 0xc5, 0x34, 0x85, 0xde, 0xb5, 0x40, 0x59, 0x18, 0x36, 0xf1, 0x62, + 0x8e, 0x77, 0x53, 0x3c, 0x72, 0x7d, 0x8f, 0x4b, 0x7a, 0x0a, 0xfd, 0xbc, 0xaa, 0xf5, 0x13, 0x1a, + 0x46, 0x7c, 0xca, 0xbb, 0x5d, 0xca, 0x0d, 0x72, 0x53, 0x6b, 0x5c, 0xa1, 0xb2, 0xf4, 0x04, 0x22, + 0xc7, 0x5c, 0xff, 0xe2, 0xd2, 0x13, 0x4c, 0xce, 0xe9, 0x9e, 0x22, 0xcd, 0x8b, 0x9b, 0xf9, 0x52, + 0xac, 0x50, 0x19, 0x47, 0xcc, 0x1b, 0x18, 0xd3, 0x11, 0xf4, 0xbf, 0xe9, 0xaa, 0x5e, 0x5f, 0x3c, + 0xb3, 0x43, 0x0f, 0x0c, 0xa0, 0x7b, 0x2d, 0xa4, 0xf4, 0xe6, 0x85, 0x74, 0x02, 0xb1, 0x8b, 0xf6, + 0xbd, 0x9b, 0x40, 0xfc, 0xb5, 0x52, 0x85, 0x70, 0xe3, 0x79, 0xe3, 0x62, 0x07, 0xe5, 0x96, 0x6b, + 0xeb, 0x57, 0x26, 0xf6, 0xaa, 0x8d, 0xa0, 0x7f, 0xa5, 0x0a, 0x0f, 0x80, 0x07, 0x26, 0x10, 0xcf, + 0xcc, 0x13, 0xaa, 0x42, 0xa8, 0xd2, 0xbb, 0x16, 0xd1, 0x21, 0x84, 0xdf, 0xc5, 0x4a, 0x58, 0x36, + 0xf0, 0x19, 0x07, 0xd0, 0xbb, 0x5d, 0x2e, 0x0d, 0x5a, 0x36, 0x6c, 0xe3, 0x7c, 0xfb, 0x7e, 0xd0, + 0xb6, 0xcc, 0x9b, 0x84, 0x51, 0x9b, 0x70, 0x89, 0x45, 0xbd, 0x46, 0x36, 0xf6, 0xfd, 0x28, 0xc0, + 0x0d, 0xdf, 0xe4, 0xa8, 0x05, 0x9a, 0x05, 0xa3, 0x6d, 0xd1, 0xad, 0x2e, 0x50, 0x63, 0xc1, 0x8e, + 0xbc, 0x47, 0x5f, 0x60, 0xb0, 0xa7, 0x9c, 0xa1, 0x27, 0x10, 0xce, 0x2d, 0xae, 0x0c, 0x0b, 0xfe, + 0x23, 0xf0, 0xb4, 0x84, 0x64, 0x5f, 0xef, 0x66, 0x9f, 0x7e, 0x70, 0x83, 0x8d, 0xb1, 0xc7, 0x30, + 0xba, 0x47, 0x8b, 0xca, 0xa9, 0x72, 0x57, 0x49, 0xf1, 0xf4, 0xec, 0x97, 0x2a, 0x7e, 0xb9, 0x32, + 0xe2, 0xa3, 0x21, 0x84, 0xf7, 0x58, 0xe2, 0xa6, 0x59, 0xa3, 0x31, 0x44, 0x73, 0xf3, 0xc0, 0x75, + 0x89, 0xb6, 0x59, 0xa1, 0x8f, 0x3b, 0x27, 0xfd, 0x5f, 0x6a, 0xcd, 0xbd, 0xd0, 0xc1, 0x2b, 0x89, + 0x5c, 0x73, 0x32, 0xfd, 0x0c, 0xc3, 0x7f, 0xf6, 0xdd, 0x6b, 0xd4, 0xcc, 0xff, 0x52, 0xe1, 0xaf, + 0x7d, 0xd1, 0x54, 0x7c, 0x80, 0x5e, 0xb3, 0x5b, 0x09, 0x90, 0x47, 0x2e, 0xf7, 0xae, 0xdf, 0x1d, + 0x87, 0x4b, 0x0a, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xbc, 0x54, 0x7d, 0x69, 0x47, 0x04, 0x00, + 0x00, } diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index acf2a8a5926..03f0fc7cca5 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -767,7 +767,7 @@ func (itr *floatInterruptIterator) Next() (*FloatPoint, error) { if itr.count&0xFF == 0xFF { select { case <-itr.closing: - return nil, nil + return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 @@ -3424,7 +3424,7 @@ func (itr *integerInterruptIterator) Next() (*IntegerPoint, error) { if itr.count&0xFF == 0xFF { select { case <-itr.closing: - return nil, nil + return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 @@ -6064,7 +6064,7 @@ func (itr *stringInterruptIterator) Next() (*StringPoint, error) { if itr.count&0xFF == 0xFF { select { case <-itr.closing: - return nil, nil + return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 @@ -8704,7 +8704,7 @@ func (itr *booleanInterruptIterator) Next() (*BooleanPoint, error) { if itr.count&0xFF == 0xFF { select { case <-itr.closing: - return nil, nil + return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 08e39457f9f..d8040f5d89b 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -768,7 +768,7 @@ func (itr *{{$k.name}}InterruptIterator) Next() (*{{$k.Name}}Point, error) { if itr.count & 0xFF == 0xFF { select { case <-itr.closing: - return nil, nil + return nil, itr.Close() default: // Reset iterator count to zero and fall through to emit the next point. itr.count = 0 From 2d5d899ac2a1f862274908873bd9410f5e6699af Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Mar 2017 16:00:54 -0600 Subject: [PATCH 3/5] Allow queries to be interrupted during planning If a bad query is run, kill query and limits would not kick in until after it started executing. Some bad queries that involve high cardinality can cause the server to OOM just from planning which defeats the purpose of the max-select-series limit. This change primarily fixes max-select-series limit so that the query is killed earlier and has the side effect that kill query now can kill a query while it's being planned. --- tsdb/engine/tsm1/engine.go | 16 ++++++++++++++++ tsdb/meta.go | 14 ++++++++++++++ tsdb/shard.go | 7 +++++++ 3 files changed, 37 insertions(+) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7d41152d4da..44fb40dbede 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1310,6 +1310,14 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt itrs := make([]influxql.Iterator, 0, len(tagSets)) if err := func() error { for _, t := range tagSets { + // Abort if the query was killed + select { + case <-opt.InterruptCh: + influxql.Iterators(itrs).Close() + return err + default: + } + inputs, err := e.createTagSetIterators(ref, mm, t, opt) if err != nil { return err @@ -1510,6 +1518,14 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu } itrs = append(itrs, itr) + // Abort if the query was killed + select { + case <-opt.InterruptCh: + influxql.Iterators(itrs).Close() + return nil, err + default: + } + // Enforce series limit at creation time. if opt.MaxSeriesN > 0 && len(itrs) > opt.MaxSeriesN { influxql.Iterators(itrs).Close() diff --git a/tsdb/meta.go b/tsdb/meta.go index 46567ee4f97..86201f73802 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -809,6 +809,13 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf tagSets := make(map[string]*influxql.TagSet, 64) var seriesN int for _, id := range ids { + // Abort if the query was killed + select { + case <-opt.InterruptCh: + return nil, influxql.ErrQueryInterrupted + default: + } + if opt.MaxSeriesN > 0 && seriesN > opt.MaxSeriesN { return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN) } @@ -847,6 +854,13 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf // Sort the series in each tag set. for _, t := range tagSets { + // Abort if the query was killed + select { + case <-opt.InterruptCh: + return nil, influxql.ErrQueryInterrupted + default: + } + sort.Sort(t) } diff --git a/tsdb/shard.go b/tsdb/shard.go index f9e7b3c7d05..8cbde52aec8 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1046,6 +1046,13 @@ func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions) } itrs = append(itrs, itr) + select { + case <-opt.InterruptCh: + influxql.Iterators(itrs).Close() + return nil, err + default: + } + // Enforce series limit at creation time. if opt.MaxSeriesN > 0 { stats := itr.Stats() From 00306336ee30831bf38d29cff2de8fd12a91c796 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Mar 2017 16:13:36 -0600 Subject: [PATCH 4/5] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25a37ac1978..57034a30dfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ - [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. - [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values. - [#7995](https://github.com/influxdata/influxdb/issues/7995): Update liner dependency to handle docker exec. +- [#7811](https://github.com/influxdata/influxdb/issues/7811): Kill query not killing query +- [#7457](https://github.com/influxdata/influxdb/issues/7457): KILL QUERY should work during all phases of a query ## v1.2.2 [2017-03-14] From 8177df2dab2bae87cef499bdb993a026d2f16bb4 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 17 Mar 2017 16:19:10 -0600 Subject: [PATCH 5/5] Simplify Measurement.TagSets signature --- tsdb/engine/tsm1/engine.go | 4 ++-- tsdb/meta.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 44fb40dbede..6e30940c2bc 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1292,7 +1292,7 @@ func (e *Engine) createCallIterator(measurement string, call *influxql.Call, opt } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition, opt) + tagSets, err := mm.TagSets(e.id, opt) if err != nil { return nil, err } @@ -1360,7 +1360,7 @@ func (e *Engine) createVarRefIterator(measurement string, opt influxql.IteratorO } // Determine tagsets for this measurement based on dimensions and filters. - tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition, opt) + tagSets, err := mm.TagSets(e.id, opt) if err != nil { return nil, err } diff --git a/tsdb/meta.go b/tsdb/meta.go index 86201f73802..c57f774fc3d 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -793,11 +793,11 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf // This will also populate the TagSet objects with the series IDs that match each tagset and any // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. -func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) { +func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) { m.mu.RLock() // get the unique set of series ids and the filters that should be applied to each - ids, filters, err := m.filters(condition) + ids, filters, err := m.filters(opt.Condition) if err != nil { m.mu.RUnlock() return nil, err @@ -824,10 +824,10 @@ func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition inf if !s.Assigned(shardID) { continue } - tags := make(map[string]string, len(dimensions)) + tags := make(map[string]string, len(opt.Dimensions)) // Build the TagSet for this series. - for _, dim := range dimensions { + for _, dim := range opt.Dimensions { tags[dim] = s.GetTagString(dim) }