Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXPLAIN ANALYZE implementation #8947

Merged
merged 1 commit into from
Oct 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce
github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20
github.com/uber-go/atomic 74ca5ec650841aee9f289dce76e928313a37cbc6
github.com/uber-go/zap fbae0281ffd546fa6d1959fec6075ac5da7fb577
github.com/xlab/treeprint 06dfc6fa17cdde904617990a0c2d89e3e332dbb3
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
1 change: 1 addition & 0 deletions LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
- github.com/uber-go/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
- github.com/xlab/treeprint [MIT LICENSE](https://github.com/xlab/treeprint/blob/master/LICENSE)
7 changes: 4 additions & 3 deletions coordinator/shard_mapper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"io"
"time"

Expand Down Expand Up @@ -160,7 +161,7 @@ func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influ
return typ
}

func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
Expand All @@ -184,7 +185,7 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
inputs := make([]query.Iterator, 0, len(measurements))
if err := func() error {
for _, measurement := range measurements {
input, err := sg.CreateIterator(measurement, opt)
input, err := sg.CreateIterator(ctx, measurement, opt)
if err != nil {
return err
}
Expand All @@ -197,7 +198,7 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
}
return query.Iterators(inputs).Merge(opt)
}
return sg.CreateIterator(m.Name, opt)
return sg.CreateIterator(ctx, m.Name, opt)
}

func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
Expand Down
7 changes: 4 additions & 3 deletions coordinator/shard_mapper_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator_test

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestLocalShardMapper(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(measurement string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
if measurement != "cpu" {
t.Errorf("unexpected measurement: %s", measurement)
}
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestLocalShardMapper(t *testing.T) {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

if _, err := ic.CreateIterator(measurement, query.IteratorOptions{}); err != nil {
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{}); err != nil {
t.Fatalf("unexpected error: %s", err)
}

Expand All @@ -97,7 +98,7 @@ func TestLocalShardMapper(t *testing.T) {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

if _, err := ic.CreateIterator(measurement, query.IteratorOptions{}); err != nil {
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
129 changes: 100 additions & 29 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/pkg/tracing"
"github.com/influxdata/influxdb/pkg/tracing/fields"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -56,7 +59,7 @@ type StatementExecutor struct {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(stmt, &ctx)
return e.executeSelectStatement(context.Background(), stmt, &ctx)
}

var rows models.Rows
Expand Down Expand Up @@ -136,7 +139,11 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
}
err = e.executeDropUserStatement(stmt)
case *influxql.ExplainStatement:
rows, err = e.executeExplainStatement(stmt, &ctx)
if stmt.Analyze {
rows, err = e.executeExplainAnalyzeStatement(stmt, &ctx)
} else {
rows, err = e.executeExplainStatement(stmt, &ctx)
}
case *influxql.GrantStatement:
if ctx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
Expand Down Expand Up @@ -401,17 +408,13 @@ func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStateme
return e.MetaClient.DropUser(q.Name)
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ctx *query.ExecutionContext) (models.Rows, error) {
if q.Analyze {
return nil, errors.New("analyze is currently unimplemented")
}

func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
InterruptCh: ectx.InterruptCh,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
Authorizer: ectx.Authorizer,
}

// Prepare the query for execution, but do not actually execute it.
Expand All @@ -437,6 +440,74 @@ func (e *StatementExecutor) executeExplainStatement(q *influxql.ExplainStatement
return models.Rows{row}, nil
}

func (e *StatementExecutor) executeExplainAnalyzeStatement(q *influxql.ExplainStatement, ectx *query.ExecutionContext) (models.Rows, error) {
stmt := q.Statement
t, span := tracing.NewTrace("select")
ctx := tracing.NewContextWithTrace(context.Background(), t)
ctx = tracing.NewContextWithSpan(ctx, span)
start := time.Now()

itrs, columns, err := e.createIterators(ctx, stmt, ectx)
if err != nil {
return nil, err
}

iterTime := time.Since(start)

// Generate a row emitter from the iterator set.
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize)
em.Columns = columns
if stmt.Location != nil {
em.Location = stmt.Location
}
em.OmitTime = stmt.OmitTime
em.EmitName = stmt.EmitName

// Emit rows to the results channel.
var writeN int64
for {
var row *models.Row
row, _, err = em.Emit()
if err != nil {
goto CLEANUP
} else if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-ectx.InterruptCh:
err = query.ErrQueryInterrupted
goto CLEANUP
default:
}
break
}

writeN += int64(len(row.Values))
}

CLEANUP:
em.Close()
if err != nil {
return nil, err
}

totalTime := time.Since(start)
span.MergeFields(
fields.Duration("total_time", totalTime),
fields.Duration("planning_time", iterTime),
fields.Duration("execution_time", totalTime-iterTime),
)
span.Finish()

row := &models.Row{
Columns: []string{"EXPLAIN ANALYZE"},
}
for _, s := range strings.Split(t.Tree().String(), "\n") {
row.Values = append(row.Values, []interface{}{s})
}

return models.Rows{row}, nil
}

func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) error {
return e.MetaClient.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)
}
Expand Down Expand Up @@ -469,14 +540,14 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
return e.MetaClient.UpdateUser(q.Name, q.Password)
}

func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
itrs, columns, err := e.createIterators(stmt, ctx)
func (e *StatementExecutor) executeSelectStatement(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) error {
itrs, columns, err := e.createIterators(ctx, stmt, ectx)
if err != nil {
return err
}

// Generate a row emitter from the iterator set.
em := query.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em := query.NewEmitter(itrs, stmt.TimeAscending(), ectx.ChunkSize)
em.Columns = columns
if stmt.Location != nil {
em.Location = stmt.Location
Expand All @@ -501,7 +572,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
} else if row == nil {
// Check if the query was interrupted while emitting.
select {
case <-ctx.InterruptCh:
case <-ectx.InterruptCh:
return query.ErrQueryInterrupted
default:
}
Expand All @@ -518,13 +589,13 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

result := &query.Result{
StatementID: ctx.StatementID,
StatementID: ectx.StatementID,
Series: []*models.Row{row},
Partial: partial,
}

// Send results or exit if closing.
if err := ctx.Send(result); err != nil {
if err := ectx.Send(result); err != nil {
return err
}

Expand All @@ -538,12 +609,12 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}

var messages []*query.Message
if ctx.ReadOnly {
if ectx.ReadOnly {
messages = append(messages, query.ReadOnlyWarning(stmt.String()))
}

return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
return ectx.Send(&query.Result{
StatementID: ectx.StatementID,
Messages: messages,
Series: []*models.Row{{
Name: "result",
Expand All @@ -555,33 +626,33 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen

// Always emit at least one result.
if !emitted {
return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
return ectx.Send(&query.Result{
StatementID: ectx.StatementID,
Series: make([]*models.Row, 0),
})
}

return nil
}

func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) ([]query.Iterator, []string, error) {
func (e *StatementExecutor) createIterators(ctx context.Context, stmt *influxql.SelectStatement, ectx *query.ExecutionContext) ([]query.Iterator, []string, error) {
opt := query.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
InterruptCh: ectx.InterruptCh,
NodeID: ectx.ExecutionOptions.NodeID,
MaxSeriesN: e.MaxSelectSeriesN,
MaxBucketsN: e.MaxSelectBucketsN,
Authorizer: ctx.Authorizer,
Authorizer: ectx.Authorizer,
}

// Create a set of iterators from a selection.
itrs, columns, err := query.Select(stmt, e.ShardMapper, opt)
itrs, columns, err := query.Select(ctx, stmt, e.ShardMapper, opt)
if err != nil {
return nil, nil, err
}

if e.MaxSelectPointN > 0 {
monitor := query.PointLimitMonitor(itrs, query.DefaultStatsInterval, e.MaxSelectPointN)
ctx.Query.Monitor(monitor)
ectx.Query.Monitor(monitor)
}
return itrs, columns, nil
}
Expand Down Expand Up @@ -1073,8 +1144,8 @@ func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultD
case *influxql.Measurement:
switch stmt.(type) {
case *influxql.DropSeriesStatement, *influxql.DeleteSeriesStatement:
// DB and RP not supported by these statements so don't rewrite into invalid
// statements
// DB and RP not supported by these statements so don't rewrite into invalid
// statements
default:
err = e.normalizeMeasurement(node, defaultDatabase)
}
Expand Down
11 changes: 6 additions & 5 deletions coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coordinator_test

import (
"bytes"
"context"
"errors"
"io"
"os"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(m string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error) {
return &FloatIterator{Points: []query.FloatPoint{
{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}},
{Name: "cpu", Time: int64(1 * time.Second), Aux: []interface{}{float64(200)}},
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
}

var sh MockShard
sh.CreateIteratorFn = func(m string, opt query.IteratorOptions) (query.Iterator, error) {
sh.CreateIteratorFn = func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error) {
return &FloatIterator{
Points: []query.FloatPoint{{Name: "cpu", Time: int64(0 * time.Second), Aux: []interface{}{float64(100)}}},
}, nil
Expand Down Expand Up @@ -384,7 +385,7 @@ func (s *TSDBStore) TagValues(_ query.Authorizer, database string, cond influxql
type MockShard struct {
Measurements []string
FieldDimensionsFn func(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
CreateIteratorFn func(m string, opt query.IteratorOptions) (query.Iterator, error)
CreateIteratorFn func(ctx context.Context, m string, opt query.IteratorOptions) (query.Iterator, error)
IteratorCostFn func(m string, opt query.IteratorOptions) (query.IteratorCost, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
}
Expand Down Expand Up @@ -417,8 +418,8 @@ func (sh *MockShard) MapType(measurement, field string) influxql.DataType {
return influxql.Unknown
}

func (sh *MockShard) CreateIterator(measurement string, opt query.IteratorOptions) (query.Iterator, error) {
return sh.CreateIteratorFn(measurement, opt)
func (sh *MockShard) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) {
return sh.CreateIteratorFn(ctx, measurement, opt)
}

func (sh *MockShard) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/metrics/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metrics

import "context"

type key int

const (
groupKey key = iota
)

// NewContextWithGroup returns a new context with the given Group added.
func NewContextWithGroup(ctx context.Context, c *Group) context.Context {
return context.WithValue(ctx, groupKey, c)
}

// GroupFromContext returns the Group associated with ctx or nil if no Group has been assigned.
func GroupFromContext(ctx context.Context) *Group {
c, _ := ctx.Value(groupKey).(*Group)
return c
}
Loading