Skip to content

Commit

Permalink
Serialize EXPLAIN ANALYZE traces from remote nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartcarnie committed Oct 11, 2017
1 parent 8894581 commit 53833d2
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 206 deletions.
111 changes: 59 additions & 52 deletions query/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions query/internal/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ message Point {
optional uint64 UnsignedValue = 12;

optional IteratorStats Stats = 11;
optional bytes Trace = 13;
}

message Aux {
Expand Down
80 changes: 11 additions & 69 deletions query/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ package query

import (
"container/heap"
"encoding/binary"
"fmt"
"context"
"io"
"sort"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql"
internal "github.com/influxdata/influxdb/query/internal"
)

// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
Expand Down Expand Up @@ -3384,8 +3382,8 @@ type floatReaderIterator struct {
}

// newFloatReaderIterator returns a new instance of floatReaderIterator.
func newFloatReaderIterator(r io.Reader, stats IteratorStats) *floatReaderIterator {
dec := NewFloatPointDecoder(r)
func newFloatReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *floatReaderIterator {
dec := NewFloatPointDecoder(ctx, r)
dec.stats = stats

return &floatReaderIterator{
Expand Down Expand Up @@ -6777,8 +6775,8 @@ type integerReaderIterator struct {
}

// newIntegerReaderIterator returns a new instance of integerReaderIterator.
func newIntegerReaderIterator(r io.Reader, stats IteratorStats) *integerReaderIterator {
dec := NewIntegerPointDecoder(r)
func newIntegerReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *integerReaderIterator {
dec := NewIntegerPointDecoder(ctx, r)
dec.stats = stats

return &integerReaderIterator{
Expand Down Expand Up @@ -10170,8 +10168,8 @@ type unsignedReaderIterator struct {
}

// newUnsignedReaderIterator returns a new instance of unsignedReaderIterator.
func newUnsignedReaderIterator(r io.Reader, stats IteratorStats) *unsignedReaderIterator {
dec := NewUnsignedPointDecoder(r)
func newUnsignedReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *unsignedReaderIterator {
dec := NewUnsignedPointDecoder(ctx, r)
dec.stats = stats

return &unsignedReaderIterator{
Expand Down Expand Up @@ -13549,8 +13547,8 @@ type stringReaderIterator struct {
}

// newStringReaderIterator returns a new instance of stringReaderIterator.
func newStringReaderIterator(r io.Reader, stats IteratorStats) *stringReaderIterator {
dec := NewStringPointDecoder(r)
func newStringReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *stringReaderIterator {
dec := NewStringPointDecoder(ctx, r)
dec.stats = stats

return &stringReaderIterator{
Expand Down Expand Up @@ -16928,8 +16926,8 @@ type booleanReaderIterator struct {
}

// newBooleanReaderIterator returns a new instance of booleanReaderIterator.
func newBooleanReaderIterator(r io.Reader, stats IteratorStats) *booleanReaderIterator {
dec := NewBooleanPointDecoder(r)
func newBooleanReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *booleanReaderIterator {
dec := NewBooleanPointDecoder(ctx, r)
dec.stats = stats

return &booleanReaderIterator{
Expand Down Expand Up @@ -16963,39 +16961,6 @@ func (itr *booleanReaderIterator) Next() (*BooleanPoint, error) {
return p, nil
}

// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer

// Frequency with which stats are emitted.
StatsInterval time.Duration
}

// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{
w: w,

StatsInterval: DefaultStatsInterval,
}
}

// EncodeIterator encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}

// encodeFloatIterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encodeFloatIterator(itr FloatIterator) error {
ticker := time.NewTicker(enc.StatsInterval)
Expand Down Expand Up @@ -17210,26 +17175,3 @@ func (enc *IteratorEncoder) encodeBooleanIterator(itr BooleanIterator) error {
}
return nil
}

// encode a stats object in the point stream.
func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error {
buf, err := proto.Marshal(&internal.Point{
Name: proto.String(""),
Tags: proto.String(""),
Time: proto.Int64(0),
Nil: proto.Bool(false),

Stats: encodeIteratorStats(&stats),
})
if err != nil {
return err
}

if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}
67 changes: 4 additions & 63 deletions query/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package query

import (
"context"
"container/heap"
"encoding/binary"
"fmt"
"io"
"sort"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql"
internal "github.com/influxdata/influxdb/query/internal"
)

// DefaultStatsInterval is the default value for IteratorEncoder.StatsInterval.
Expand Down Expand Up @@ -1723,13 +1721,13 @@ type {{$k.name}}ReaderIterator struct {
}

// new{{$k.Name}}ReaderIterator returns a new instance of {{$k.name}}ReaderIterator.
func new{{$k.Name}}ReaderIterator(r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator {
dec := New{{$k.Name}}PointDecoder(r)
func new{{$k.Name}}ReaderIterator(ctx context.Context, r io.Reader, stats IteratorStats) *{{$k.name}}ReaderIterator {
dec := New{{$k.Name}}PointDecoder(ctx, r)
dec.stats = stats

return &{{$k.name}}ReaderIterator{
r: r,
dec: dec,
dec: dec,
}
}

Expand Down Expand Up @@ -1759,40 +1757,6 @@ func (itr *{{$k.name}}ReaderIterator) Next() (*{{$k.Name}}Point, error) {
}
{{end}}


// IteratorEncoder is an encoder for encoding an iterator's points to w.
type IteratorEncoder struct {
w io.Writer

// Frequency with which stats are emitted.
StatsInterval time.Duration
}

// NewIteratorEncoder encodes an iterator's points to w.
func NewIteratorEncoder(w io.Writer) *IteratorEncoder {
return &IteratorEncoder{
w: w,

StatsInterval: DefaultStatsInterval,
}
}

// EncodeIterator encodes and writes all of itr's points to the underlying writer.
func (enc *IteratorEncoder) EncodeIterator(itr Iterator) error {
switch itr := itr.(type) {
case FloatIterator:
return enc.encodeFloatIterator(itr)
case IntegerIterator:
return enc.encodeIntegerIterator(itr)
case StringIterator:
return enc.encodeStringIterator(itr)
case BooleanIterator:
return enc.encodeBooleanIterator(itr)
default:
panic(fmt.Sprintf("unsupported iterator for encoder: %T", itr))
}
}

{{range .}}
// encode{{.Name}}Iterator encodes all points from itr to the underlying writer.
func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error {
Expand Down Expand Up @@ -1839,27 +1803,4 @@ func (enc *IteratorEncoder) encode{{.Name}}Iterator(itr {{.Name}}Iterator) error

{{end}}

// encode a stats object in the point stream.
func (enc *IteratorEncoder) encodeStats(stats IteratorStats) error {
buf, err := proto.Marshal(&internal.Point{
Name: proto.String(""),
Tags: proto.String(""),
Time: proto.Int64(0),
Nil: proto.Bool(false),

Stats: encodeIteratorStats(&stats),
})
if err != nil {
return err
}

if err := binary.Write(enc.w, binary.BigEndian, uint32(len(buf))); err != nil {
return err
}
if _, err := enc.w.Write(buf); err != nil {
return err
}
return nil
}

{{end}}
Loading

0 comments on commit 53833d2

Please sign in to comment.