Skip to content

Commit

Permalink
Add a function to detach the TableReaderExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jul 4, 2024
1 parent 95edc2d commit bfabc0d
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 124 deletions.
1 change: 0 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func newReorgExprCtx() exprctx.ExprContext {

return contextstatic.NewStaticExprContext(
contextstatic.WithEvalCtx(evalCtx),
contextstatic.WithUseCache(false),
)
}

Expand Down
29 changes: 23 additions & 6 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/nocopy"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
Expand All @@ -34,11 +33,6 @@ import (

// DistSQLContext provides all information needed by using functions in `distsql`
type DistSQLContext struct {
// TODO: provide a `Clone` to copy this struct.
// The life cycle of some fields in this struct cannot be extended. For example, some fields will be recycled before
// the next execution. They'll need to be handled specially.
_ nocopy.NoCopy

WarnHandler contextutil.WarnAppender

InRestrictedSQL bool
Expand Down Expand Up @@ -95,3 +89,26 @@ type DistSQLContext struct {
func (dctx *DistSQLContext) AppendWarning(warn error) {
dctx.WarnHandler.AppendWarning(warn)
}

// Detach detaches this context from the session context.
//
// NOTE: Though this session context can be used parallely with this context after calling
// it, the `StatementContext` cannot. The session context should create a new `StatementContext`
// before executing another statement.
func (dctx *DistSQLContext) Detach() *DistSQLContext {
newCtx := *dctx

// TODO: this SQLKiller is not connected with the original one, so the user will have no way to kill
// the SQL running with the detached context. The current implementation of `SQLKiller` doesn't support
// tracking a reference which may run across multiple statements, becuase before executing any statement
//, the `SQLKiller` will always be reset.
//
// A simple way to fix it is to use the origianl SQLKiller, and wait for all cursor to be closed after
// receiving a kill signal and before resetting it. For now, it uses a newly created `SQLKiller` to avoid
// affecting the original one and keep safety.
newCtx.SQLKiller = &sqlkiller.SQLKiller{}
newCtx.KVVars = new(tikvstore.Variables)
*newCtx.KVVars = *dctx.KVVars
newCtx.KVVars.Killed = &newCtx.SQLKiller.Signal
return &newCtx
}
9 changes: 6 additions & 3 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,18 @@ func (a *recordSet) OnFetchReturned() {

// Detach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) {
// TODO: also detach the executor. Currently, the executor inside may contain the session context. Once
// the executor itself supports detach, we should also detach it here.
e, ok := a.executor.(*TableReaderExecutor)
e, ok := Detach(a.executor)
if !ok {
return nil, false, nil
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil
}

// GetExecutor4Test exports the internal executor for test purpose.
func (a *recordSet) GetExecutor4Test() any {
return a.executor
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down
74 changes: 74 additions & 0 deletions pkg/executor/detach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression/contextsession"
)

// Detach detaches the current executor from the session context. After detaching, the session context
// can be used to execute another statement while this executor is still running. The returning value
// shows whether this executor is able to be detached.
//
// NOTE: the implementation of `Detach` should guarantee that no matter whether it returns true or false,
// both the original executor and the returning executor should be able to be used correctly. This restriction
// is to make sure that if `Detach(a)` returns `true`, while other children of `a`'s parent returns `false`,
// the caller can still use the original one.
func Detach(originalExecutor exec.Executor) (exec.Executor, bool) {
newExecutor, ok := originalExecutor.Detach()
if !ok {
return nil, false
}

children := originalExecutor.AllChildren()
newChildren := make([]exec.Executor, len(children))
for i, child := range children {
detached, ok := Detach(child)
if !ok {
return nil, false
}
newChildren[i] = detached
}
copy(newExecutor.AllChildren(), newChildren)

return newExecutor, true
}

func (treCtx tableReaderExecutorContext) Detach() tableReaderExecutorContext {
newCtx := treCtx

if ctx, ok := treCtx.ectx.(*contextsession.SessionExprContext); ok {
staticExprCtx := ctx.IntoStatic()

newCtx.dctx = newCtx.dctx.Detach()
newCtx.rctx = newCtx.rctx.Detach(staticExprCtx)
newCtx.buildPBCtx = newCtx.buildPBCtx.Detach(staticExprCtx)
newCtx.ectx = staticExprCtx
return newCtx
}

return treCtx
}

// Detach detaches the current executor from the session context.
func (e *TableReaderExecutor) Detach() (exec.Executor, bool) {
newExec := new(TableReaderExecutor)
*newExec = *e

newExec.tableReaderExecutorContext = newExec.tableReaderExecutorContext.Detach()

return newExec, true
}
70 changes: 70 additions & 0 deletions pkg/executor/detach_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"testing"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
)

type exportExecutor interface {
GetExecutor4Test() any
}

func TestDetachAllContexts(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.Session().GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
oldExecutor := rs.(exportExecutor).GetExecutor4Test().(exec.Executor)

drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

require.NotEqual(t, rs, srs)
newExecutor := srs.(exportExecutor).GetExecutor4Test().(exec.Executor)

require.NotEqual(t, oldExecutor, newExecutor)
// Children should be different
for i, child := range oldExecutor.AllChildren() {
require.NotEqual(t, child, newExecutor.AllChildren()[i])
}

// Then execute another statement
tk.MustQuery("select * from t limit 1").Check(testkit.Rows("1"))
// The previous detached record set can still be used
// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))
}
72 changes: 72 additions & 0 deletions pkg/executor/detach_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"testing"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression/contextstatic"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

type mockSimpleExecutor struct {
exec.BaseExecutorV2
}

func TestDetachExecutor(t *testing.T) {
// call `Detach` on a mock executor will fail
_, ok := Detach(&mockSimpleExecutor{})
require.False(t, ok)

// call `Detach` on a TableReaderExecutor will succeed
oldExec := &TableReaderExecutor{
tableReaderExecutorContext: tableReaderExecutorContext{
ectx: contextstatic.NewStaticExprContext(),
},
}
newExec, ok := Detach(oldExec)
require.True(t, ok)
require.NotSame(t, oldExec, newExec)

// call `Detach` on a `TableReaderExecutor` with `mockSimpleExecutor` as child will fail
sess := mock.NewContext()
oldExec = &TableReaderExecutor{
tableReaderExecutorContext: tableReaderExecutorContext{
ectx: contextstatic.NewStaticExprContext(),
},
BaseExecutorV2: exec.NewBaseExecutorV2(sess.GetSessionVars(), nil, 0, &mockSimpleExecutor{}),
}
_, ok = Detach(oldExec)
require.False(t, ok)

// call `Detach` on a `TableReaderExecutor` with another `TableReaderExecutor` as child will succeed
child := &TableReaderExecutor{
tableReaderExecutorContext: tableReaderExecutorContext{
ectx: contextstatic.NewStaticExprContext(),
},
}
parent := &TableReaderExecutor{
tableReaderExecutorContext: tableReaderExecutorContext{
ectx: contextstatic.NewStaticExprContext(),
},
BaseExecutorV2: exec.NewBaseExecutorV2(sess.GetSessionVars(), nil, 0, child),
}
newExec, ok = Detach(parent)
require.True(t, ok)
require.NotSame(t, parent, newExec)
require.NotSame(t, child, newExec.AllChildren()[0])
}
11 changes: 11 additions & 0 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ type Executor interface {
RetFieldTypes() []*types.FieldType
InitCap() int
MaxChunkSize() int

// Detach detaches the current executor from the session context without considering its children.
//
// It has to make sure, no matter whether it returns true or false, both the original executor and the returning executor
// should be able to be used correctly.
Detach() (Executor, bool)
}

var _ Executor = &BaseExecutor{}
Expand Down Expand Up @@ -309,6 +315,11 @@ func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error {
return nil
}

// Detach detaches the current executor from the session context.
func (e *BaseExecutorV2) Detach() (Executor, bool) {
return nil, false
}

// BaseExecutor holds common information for executors.
type BaseExecutor struct {
ctx sessionctx.Context
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/staticrecordset/cursorrecordset.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (c *cursorRecordSet) Close() error {
return c.recordSet.Close()
}

// GetExecutor4Test exports the internal executor for test purpose.
func (a *cursorRecordSet) GetExecutor4Test() any {
return a.recordSet.(interface{ GetExecutor4Test() any }).GetExecutor4Test()
}

// WrapRecordSetWithCursor wraps a record set with a cursor handle. The cursor handle will be closed
// automatically when the record set is closed
func WrapRecordSetWithCursor(cursor cursor.Handle, recordSet sqlexec.RecordSet) sqlexec.RecordSet {
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/staticrecordset/recordset.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,8 @@ func (s *staticRecordSet) Close() error {

return err
}

// GetExecutor4Test exports the internal executor for test purpose.
func (a *staticRecordSet) GetExecutor4Test() any {
return a.executor
}
Loading

0 comments on commit bfabc0d

Please sign in to comment.