Skip to content

Commit

Permalink
*: refactor Executor.Next() to receive RecordBatch (#8994)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Jan 14, 2019
1 parent 319656c commit 00c4ff4
Show file tree
Hide file tree
Showing 65 changed files with 550 additions and 526 deletions.
6 changes: 3 additions & 3 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func (ut *benchDB) mustExec(sql string) {
if len(rss) > 0 {
ctx := context.Background()
rs := rss[0]
chk := rs.NewChunk()
req := rs.NewRecordBatch()
for {
err := rs.Next(ctx, chk)
err := rs.Next(ctx, req)
if err != nil {
log.Fatal(err)
}
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
}
Expand Down
16 changes: 8 additions & 8 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err := s.se.Execute(context.Background(), "select count(*) from t")
c.Assert(err, IsNil)
chk := rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
req := rs[0].NewRecordBatch()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row := chk.GetRow(0)
c.Assert(req.NumRows() == 0, IsFalse)
row := req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
Expand Down Expand Up @@ -144,11 +144,11 @@ func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
// Make sure that the table's data has not been deleted.
rs, err = s.se.Execute(context.Background(), "select count(*) from tx")
c.Assert(err, IsNil)
chk = rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
req = rs[0].NewRecordBatch()
err = rs[0].Next(context.Background(), req)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(req.NumRows() == 0, IsFalse)
row = req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
Expand Down
8 changes: 4 additions & 4 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u
}

rs := rss[0]
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
for {
err = rs.Next(context.TODO(), chk)
err = rs.Next(context.TODO(), req)
if err != nil {
return nil, errors.Trace(err)
}
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}

Expand Down
14 changes: 7 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("recordSet.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

err := a.executor.Next(ctx, chk)
err := a.executor.Next(ctx, req)
if err != nil {
a.lastErr = err
return errors.Trace(err)
}
numRows := chk.NumRows()
numRows := req.NumRows()
if numRows == 0 {
if a.stmt != nil {
a.stmt.Ctx.GetSessionVars().LastFoundRows = a.stmt.Ctx.GetSessionVars().StmtCtx.FoundRows()
Expand All @@ -119,9 +119,9 @@ func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

// NewChunk create a new chunk using NewChunk function in chunk package.
func (a *recordSet) NewChunk() *chunk.Chunk {
return a.executor.newFirstChunk()
// NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk().
func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(a.executor.newFirstChunk())
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -295,7 +295,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.LogSlowQuery(txnTS, err == nil)
}()

err = e.Next(ctx, e.newFirstChunk())
err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk()))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
22 changes: 11 additions & 11 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type CheckIndexRangeExec struct {
}

// Next implements the Executor Next interface.
func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *CheckIndexRangeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
handleIdx := e.schema.Len() - 1
for {
err := e.result.Next(ctx, e.srcChunk)
Expand All @@ -76,12 +76,12 @@ func (e *CheckIndexRangeExec) Next(ctx context.Context, chk *chunk.Chunk) error
handle := row.GetInt64(handleIdx)
for _, hr := range e.handleRanges {
if handle >= hr.Begin && handle < hr.End {
chk.AppendRow(row)
req.AppendRow(row)
break
}
}
}
if chk.NumRows() > 0 {
if req.NumRows() > 0 {
return nil
}
}
Expand Down Expand Up @@ -444,8 +444,8 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

// Next implements the Executor Next interface.
func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *RecoverIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
Expand All @@ -455,8 +455,8 @@ func (e *RecoverIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return errors.Trace(err)
}

chk.AppendInt64(0, totalAddedCnt)
chk.AppendInt64(1, totalScanCnt)
req.AppendInt64(0, totalAddedCnt)
req.AppendInt64(1, totalScanCnt)
e.done = true
return nil
}
Expand Down Expand Up @@ -580,8 +580,8 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
}

// Next implements the Executor Next interface.
func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
Expand Down Expand Up @@ -614,7 +614,7 @@ func (e *CleanupIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
}
e.done = true
chk.AppendUint64(0, e.removeCnt)
req.AppendUint64(0, e.removeCnt)
return nil
}

Expand Down
26 changes: 13 additions & 13 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,20 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro
}

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *HashAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("hashagg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), req.NumRows()) }()
}
chk.Reset()
req.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
return errors.Trace(e.unparallelExec(ctx, req.Chunk))
}
return errors.Trace(e.parallelExec(ctx, chk))
return errors.Trace(e.parallelExec(ctx, req.Chunk))
}

func (e *HashAggExec) fetchChildData(ctx context.Context) {
Expand Down Expand Up @@ -559,7 +559,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chk)
err = e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
Expand Down Expand Up @@ -684,7 +684,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childResult)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -798,18 +798,18 @@ func (e *StreamAggExec) Close() error {
}

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *StreamAggExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("streamAgg.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), req.NumRows()) }()
}
chk.Reset()
for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
req.Reset()
for !e.executed && req.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, req.Chunk)
if err != nil {
e.executed = true
return errors.Trace(err)
Expand Down Expand Up @@ -874,7 +874,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return errors.Trace(err)
}

err = e.children[0].Next(ctx, e.childResult)
err = e.children[0].Next(ctx, chunk.NewRecordBatch(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
)

// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return errors.Trace(err)
Expand Down
14 changes: 7 additions & 7 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ func (e *ChecksumTableExec) Open(ctx context.Context) error {
}

// Next implements the Executor Next interface.
func (e *ChecksumTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
func (e *ChecksumTableExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
req.Reset()
if e.done {
return nil
}
for _, t := range e.tables {
chk.AppendString(0, t.DBInfo.Name.O)
chk.AppendString(1, t.TableInfo.Name.O)
chk.AppendUint64(2, t.Response.Checksum)
chk.AppendUint64(3, t.Response.TotalKvs)
chk.AppendUint64(4, t.Response.TotalBytes)
req.AppendString(0, t.DBInfo.Name.O)
req.AppendString(1, t.TableInfo.Name.O)
req.AppendUint64(2, t.Response.Checksum)
req.AppendUint64(3, t.Response.TotalKvs)
req.AppendUint64(4, t.Response.TotalBytes)
}
e.done = true
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *DDLExec) toErr(err error) error {
}

// Next implements the Executor Next interface.
func (e *DDLExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) {
if e.done {
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func (s *testSuite3) TestCreateTable(c *C) {
rs, err := tk.Exec(`desc issue312_1`)
c.Assert(err, IsNil)
ctx := context.Background()
chk := rs.NewChunk()
it := chunk.NewIterator4Chunk(chk)
req := rs.NewRecordBatch()
it := chunk.NewIterator4Chunk(req.Chunk)
for {
err1 := rs.Next(ctx, chk)
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
Expand All @@ -102,16 +102,16 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
rs, err = tk.Exec(`desc issue312_2`)
c.Assert(err, IsNil)
chk = rs.NewChunk()
it = chunk.NewIterator4Chunk(chk)
req = rs.NewRecordBatch()
it = chunk.NewIterator4Chunk(req.Chunk)
for {
err1 := rs.Next(ctx, chk)
err1 := rs.Next(ctx, req)
c.Assert(err1, IsNil)
if chk.NumRows() == 0 {
if req.NumRows() == 0 {
break
}
for row := it.Begin(); row != it.End(); row = it.Next() {
c.Assert(chk.GetRow(0).GetString(1), Equals, "double")
c.Assert(req.GetRow(0).GetString(1), Equals, "double")
}
}

Expand Down Expand Up @@ -245,10 +245,10 @@ func (s *testSuite3) TestAlterTableAddColumn(c *C) {
now := time.Now().Add(-time.Duration(1 * time.Millisecond)).Format(types.TimeFormat)
r, err := tk.Exec("select c2 from alter_test")
c.Assert(err, IsNil)
chk := r.NewChunk()
err = r.Next(context.Background(), chk)
req := r.NewRecordBatch()
err = r.Next(context.Background(), req)
c.Assert(err, IsNil)
row := chk.GetRow(0)
row := req.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(now, GreaterEqual, row.GetTime(0).String())
r.Close()
Expand Down
8 changes: 4 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ type DeleteExec struct {
}

// Next implements the Executor Next interface.
func (e *DeleteExec) Next(ctx context.Context, chk *chunk.Chunk) error {
func (e *DeleteExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("delete.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

chk.Reset()
req.Reset()
if e.IsMultiTable {
return errors.Trace(e.deleteMultiTablesByChunk(ctx))
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := e.children[0].newFirstChunk()
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 00c4ff4

Please sign in to comment.