Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor Executor.Next() to receive RecordBatch #8994

Merged
merged 3 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func (ut *benchDB) mustExec(sql string) {
if len(rss) > 0 {
ctx := context.Background()
rs := rss[0]
chk := rs.NewChunk()
req := rs.NewRecordBatch()
for {
err := rs.Next(ctx, chk)
err := rs.Next(ctx, req)
if err != nil {
log.Fatal(err)
}
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
}
Expand Down
16 changes: 8 additions & 8 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err := s.se.Execute(context.Background(), "select count(*) from t")
c.Assert(err, IsNil)
chk := rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
req := rs[0].NewRecordBatch()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row := chk.GetRow(0)
c.Assert(req.NumRows() == 0, IsFalse)
row := req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
Expand Down Expand Up @@ -144,11 +144,11 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err = s.se.Execute(context.Background(), "select count(*) from tx")
c.Assert(err, IsNil)
chk = rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
req = rs[0].NewRecordBatch()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(req.NumRows() == 0, IsFalse)
row = req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
Expand Down
8 changes: 4 additions & 4 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u
}

rs := rss[0]
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
for {
err = rs.Next(context.TODO(), chk)
err = rs.Next(context.TODO(), req)
if err != nil {
return nil, errors.Trace(err)
}
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}

Expand Down
14 changes: 7 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("recordSet.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

err := a.executor.Next(ctx, chk)
err := a.executor.Next(ctx, req)
if err != nil {
a.lastErr = err
return errors.Trace(err)
}
numRows := chk.NumRows()
numRows := req.NumRows()
if numRows == 0 {
if a.stmt != nil {
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
Expand All @@ -119,9 +119,9 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

// NewChunk create a new chunk using NewChunk function in chunk package.
func (a *recordSet) NewChunk() *chunk.Chunk {
return a.executor.newFirstChunk()
// NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk().
func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(a.executor.newFirstChunk())
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -295,7 +295,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.LogSlowQuery(txnTS, err == nil)
}()

err = e.Next(ctx, e.newFirstChunk())
err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk()))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
22 changes: 11 additions & 11 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type CheckIndexRangeExec struct {
}

// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *CheckIndexRangeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.Next(ctx, e.srcChunk)
Expand All @@ -76,12 +76,12 @@ func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error
handle := row.GetInt64(handleIdx)
for _, hr := range e.handleRanges {
if handle >= hr.Begin && handle < hr.End {
chk.AppendRow(row)
req.AppendRow(row)
break
}
}
}
if chk.NumRows() > 0 {
if req.NumRows() > 0 {
return nil
}
}
Expand Down Expand Up @@ -444,8 +444,8 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *RecoverIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
Expand All @@ -455,8 +455,8 @@ func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return errors.Trace(err)
}

chk.AppendInt64(0, totalAddedCnt)
chk.AppendInt64(1, totalScanCnt)
req.AppendInt64(0, totalAddedCnt)
req.AppendInt64(1, totalScanCnt)
e.done = true
return nil
}
Expand Down Expand Up @@ -580,8 +580,8 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
}

// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
Expand Down Expand Up @@ -614,7 +614,7 @@ func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
}
e.done = true
chk.AppendUint64(0, e.removeCnt)
req.AppendUint64(0, e.removeCnt)
return nil
}

Expand Down
26 changes: 13 additions & 13 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,20 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro
}

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *HashAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("hashagg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), req.NumRows()) }()
}
chk.Reset()
req.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
return errors.Trace(e.unparallelExec(ctx, req.Chunk))
}
return errors.Trace(e.parallelExec(ctx, chk))
return errors.Trace(e.parallelExec(ctx, req.Chunk))
}

func (e *HashAggExec) fetchChildData(ctx context.Context) {
Expand Down Expand Up @@ -559,7 +559,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chk)
err = e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
Expand Down Expand Up @@ -684,7 +684,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childResult)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -798,18 +798,18 @@ func (e *StreamAggExec) Close() error {
}

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *StreamAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("streamAgg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), req.NumRows()) }()
}
chk.Reset()
for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
req.Reset()
for !e.executed && req.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, req.Chunk)
if err != nil {
e.executed = true
return errors.Trace(err)
Expand Down Expand Up @@ -874,7 +874,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return errors.Trace(err)
}

err = e.children[0].Next(ctx, e.childResult)
err = e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
)

// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return errors.Trace(err)
Expand Down
14 changes: 7 additions & 7 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ func (e *ChecksumTableExec) Open(ctx context.Context) error {
}

// Next implements the Executor Next interface.
func (e *ChecksumTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *ChecksumTableExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
for _, t := range e.tables {
chk.AppendString(0, t.DBInfo.Name.O)
chk.AppendString(1, t.TableInfo.Name.O)
chk.AppendUint64(2, t.Response.Checksum)
chk.AppendUint64(3, t.Response.TotalKvs)
chk.AppendUint64(4, t.Response.TotalBytes)
req.AppendString(0, t.DBInfo.Name.O)
req.AppendString(1, t.TableInfo.Name.O)
req.AppendUint64(2, t.Response.Checksum)
req.AppendUint64(3, t.Response.TotalKvs)
req.AppendUint64(4, t.Response.TotalBytes)
}
e.done = true
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *DDLExec) toErr(err error) error {
}

// Next implements the Executor Next interface.
func (e *DDLExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) {
if e.done {
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func (s *testSuite3) TestCreateTable(c *C) {
rs, err := tk.Exec(`desc issue312_1`)
c.Assert(err, IsNil)
ctx := context.Background()
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
for {
err1 := rs.Next(ctx, chk)
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
Expand All @@ -102,16 +102,16 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
rs, err = tk.Exec(`desc issue312_2`)
c.Assert(err, IsNil)
chk = rs.NewChunk()
it = chunk.NewIterator4Chunk(chk)
req = rs.NewRecordBatch()
it = chunk.NewIterator4Chunk(req.Chunk)
for {
err1 := rs.Next(ctx, chk)
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
c.Assert(chk.GetRow(0).GetString(1), Equals, "double")
c.Assert(req.GetRow(0).GetString(1), Equals, "double")
}
}

Expand Down Expand Up @@ -245,10 +245,10 @@ func (s *testSuite3) TestAlterTableAddColumn(c *C) {
now := time.Now().Add(-time.Duration(1 * time.Millisecond)).Format(types.TimeFormat)
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
chk := r.NewChunk()
err = r.Next(context.Background(), chk)
req := r.NewRecordBatch()
err = r.Next(context.Background(), req)
c.Assert(err, IsNil)
row := chk.GetRow(0)
row := req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(now, GreaterEqual, row.GetTime(0).String())
r.Close()
Expand Down
8 changes: 4 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ type DeleteExec struct {
}

// Next implements the Executor Next interface.
func (e *DeleteExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *DeleteExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("delete.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

chk.Reset()
req.Reset()
if e.IsMultiTable {
return errors.Trace(e.deleteMultiTablesByChunk(ctx))
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := e.children[0].newFirstChunk()
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading