Skip to content

Commit

Permalink
importinto: check column assignment expressions in import into ... (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored and Benjamin2037 committed Sep 11, 2024
1 parent c4b802e commit d9f0476
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 39 deletions.
2 changes: 2 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ go_library(
"//pkg/infoschema/context",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/lightning/backend/encode",
"//pkg/lightning/backend/kv",
"//pkg/lightning/log",
"//pkg/lightning/mydump",
"//pkg/meta",
Expand Down
47 changes: 47 additions & 0 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"github.com/pingcap/tidb/pkg/disttask/importinto"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
litkv "github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -89,6 +92,10 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
return err
}
astArgs := importer.ASTArgsFromImportPlan(e.plan)
if err = ValidateImportIntoColAssignmentsWithEncodeCtx(importPlan, astArgs.ColumnAssignments); err != nil {
return err
}

controller, err := importer.NewLoadDataController(importPlan, e.tbl, astArgs)
if err != nil {
return err
Expand Down Expand Up @@ -134,6 +141,46 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
return e.fillJobInfo(ctx, jobID, req)
}

// ValidateImportIntoColAssignmentsWithEncodeCtx validates the column assignment expressions should be compatible with the
// encoding context (which maybe different with the context in the current session).
// For example, the function `tidb_is_ddl_owner()` requires the optional eval properties which are not
// provided by the encoding context, so we should avoid using it in the column assignment expressions.
func ValidateImportIntoColAssignmentsWithEncodeCtx(plan *importer.Plan, assigns []*ast.Assignment) error {
encodeCtx := litkv.NewSession(&encode.SessionOptions{
SQLMode: plan.SQLMode,
SysVars: plan.ImportantSysVars,
}, log.L())

providedProps := encodeCtx.GetExprCtx().GetEvalCtx().GetOptionalPropSet()
for i, assign := range assigns {
expr, err := expression.BuildSimpleExpr(encodeCtx.GetExprCtx(), assign.Expr)
if err != nil {
return err
}

if err = checkExprWithProvidedProps(i, expr, providedProps); err != nil {
return err
}
}

return nil
}

func checkExprWithProvidedProps(idx int, expr expression.Expression, props expression.OptionalEvalPropKeySet) error {
if e, ok := expr.(*expression.ScalarFunction); ok {
if e.Function.RequiredOptionalEvalProps()|props != props {
return errors.Errorf("FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", e.FuncName.O, idx)
}

for _, arg := range e.GetArgs() {
if err := checkExprWithProvidedProps(idx, arg, props); err != nil {
return err
}
}
}
return nil
}

func (e *ImportIntoExec) fillJobInfo(ctx context.Context, jobID int64, req *chunk.Chunk) error {
e.dataFilled = true
// we use taskManager to get job, user might not have the privilege to system tables.
Expand Down
67 changes: 67 additions & 0 deletions pkg/executor/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
package executor_test

import (
"fmt"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sem"
"github.com/stretchr/testify/require"
)

func TestSecurityEnhancedMode(t *testing.T) {
Expand All @@ -33,3 +40,63 @@ func TestSecurityEnhancedMode(t *testing.T) {
// regardless of what privileges they have available.
tk.MustGetErrMsg("IMPORT INTO test.t FROM '/file.csv'", "[planner:8132]Feature 'IMPORT INTO from server disk' is not supported when security enhanced mode is enabled")
}

func TestImportIntoValidateColAssignmentsWithEncodeCtx(t *testing.T) {
cases := []struct {
exprs []string
error string
}{
{
exprs: []string{"'x'", "1+@1", "concat('hello', 'world')", "getvar('var1')"},
},
{
exprs: []string{"setvar('a', 'b')"},
error: "FUNCTION setvar is not supported in IMPORT INTO column assignment, index 0",
},
{
exprs: []string{"current_user()"},
error: "FUNCTION current_user is not supported in IMPORT INTO column assignment, index 0",
},
{
exprs: []string{"current_role()"},
error: "FUNCTION current_role is not supported in IMPORT INTO column assignment, index 0",
},
{
exprs: []string{"connection_id()"},
error: "FUNCTION connection_id is not supported in IMPORT INTO column assignment, index 0",
},
{
exprs: []string{"1", "tidb_is_ddl_owner()"},
error: "FUNCTION tidb_is_ddl_owner is not supported in IMPORT INTO column assignment, index 1",
},
{
exprs: []string{"sleep(1)"},
error: "FUNCTION sleep is not supported in IMPORT INTO column assignment, index 0",
},
{
exprs: []string{"LAST_INSERT_ID()"},
error: "FUNCTION last_insert_id is not supported in IMPORT INTO column assignment, index 0",
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("case-%d-%s", i, strings.Join(c.exprs, ",")), func(t *testing.T) {
assigns := make([]*ast.Assignment, 0, len(c.exprs))
for _, exprStr := range c.exprs {
stmt, err := parser.New().ParseOneStmt("select "+exprStr, "", "")
require.NoError(t, err)
expr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
assigns = append(assigns, &ast.Assignment{
Expr: expr,
})
}

err := executor.ValidateImportIntoColAssignmentsWithEncodeCtx(&importer.Plan{}, assigns)
if c.error == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, c.error)
}
})
}
}
23 changes: 2 additions & 21 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,20 +1297,11 @@ func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) (
_ []contextutil.SQLWarn,
retErr error,
) {
var (
i int
assign *ast.Assignment
)
// TODO(lance6716): indeterministic function should also return error
defer tidbutil.Recover("load-data/import-into", "CreateColAssignExprs", func() {
retErr = errors.Errorf("can't use function at SET index %d", i)
}, false)

e.colAssignMu.Lock()
defer e.colAssignMu.Unlock()
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
allWarnings := []contextutil.SQLWarn{}
for i, assign = range e.ColumnAssignments {
for _, assign := range e.ColumnAssignments {
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(planCtx, assign.Expr, nil, nil, false)
// col assign expr warnings is static, we should generate it for each row processed.
// so we save it and clear it here.
Expand All @@ -1332,21 +1323,11 @@ func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) (
// - Aggregate functions
// - Other special functions used in some specified queries such as `GROUPING`, `VALUES` ...
func (e *LoadDataController) CreateColAssignSimpleExprs(ctx expression.BuildContext) (_ []expression.Expression, _ []contextutil.SQLWarn, retErr error) {
var (
i int
assign *ast.Assignment
)

// TODO(lance6716): indeterministic function should also return error
defer tidbutil.Recover("load-data/import-into", "CreateColAssignExprs", func() {
retErr = errors.Errorf("can't use function at SET index %d", i)
}, false)

e.colAssignMu.Lock()
defer e.colAssignMu.Unlock()
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
var allWarnings []contextutil.SQLWarn
for i, assign = range e.ColumnAssignments {
for _, assign := range e.ColumnAssignments {
newExpr, err := expression.BuildSimpleExpr(ctx, assign.Expr)
// col assign expr warnings is static, we should generate it for each row processed.
// so we save it and clear it here.
Expand Down
40 changes: 38 additions & 2 deletions pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,34 @@ func (*transaction) MayFlush() error {
return nil
}

// sessExprContext implements the ExprContext interface
// It embedded an `ExprContext` and a `sessEvalContext` to provide no optional properties.
type sessExprContext struct {
exprctx.ExprContext
evalCtx *sessEvalContext
}

// GetEvalCtx implements the ExprContext.GetEvalCtx interface
func (ctx *sessExprContext) GetEvalCtx() exprctx.EvalContext {
return ctx.evalCtx
}

// sessEvalContext implements the EvalContext interface
// It embedded an `EvalContext` and provide no optional properties.
type sessEvalContext struct {
exprctx.EvalContext
}

// GetOptionalPropSet returns the optional properties provided by this context.
func (*sessEvalContext) GetOptionalPropSet() exprctx.OptionalEvalPropKeySet {
return 0
}

// GetOptionalPropProvider gets the optional property provider by key
func (*sessEvalContext) GetOptionalPropProvider(exprctx.OptionalEvalPropKey) (exprctx.OptionalEvalPropProvider, bool) {
return nil, false
}

// session is a trimmed down Session type which only wraps our own trimmed-down
// transaction type and provides the session variables to the TiDB library
// optimized for Lightning.
Expand All @@ -294,7 +322,7 @@ type session struct {
planctx.EmptyPlanContextExtended
txn transaction
Vars *variable.SessionVars
exprCtx *exprctximpl.SessionExprContext
exprCtx *sessExprContext
tblctx *tbctximpl.TableContextImpl
// currently, we only set `CommonAddRecordCtx`
values map[fmt.Stringer]any
Expand Down Expand Up @@ -350,7 +378,15 @@ func newSession(options *encode.SessionOptions, logger log.Logger) *session {
}
vars.TxnCtx = nil
s.Vars = vars
s.exprCtx = exprctximpl.NewSessionExprContext(s)
exprCtx := exprctximpl.NewSessionExprContext(s)
// The exprCtx should be an expression context providing no optional properties in `EvalContext`.
// That is to make sure it only allows expressions that require basic context.
s.exprCtx = &sessExprContext{
ExprContext: exprCtx,
evalCtx: &sessEvalContext{
EvalContext: exprCtx.GetEvalCtx(),
},
}
s.tblctx = tbctximpl.NewTableContextImpl(s)
s.txn.kvPairs = &Pairs{}

Expand Down
105 changes: 93 additions & 12 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4329,6 +4329,93 @@ var (
ImportIntoDataSource = "data source"
)

// importIntoCollAssignmentChecker implements ast.Visitor interface.
// It is used to check the column assignment expressions in IMPORT INTO statement.
// Currently, the import into column assignment only supports some simple expressions.
type importIntoCollAssignmentChecker struct {
idx int
err error
neededVars map[string]int
}

func newImportIntoCollAssignmentChecker() *importIntoCollAssignmentChecker {
return &importIntoCollAssignmentChecker{neededVars: make(map[string]int)}
}

// checkImportIntoColAssignments checks the column assignment expressions in IMPORT INTO statement.
func checkImportIntoColAssignments(assignments []*ast.Assignment) (map[string]int, error) {
checker := newImportIntoCollAssignmentChecker()
for i, assign := range assignments {
checker.idx = i
assign.Expr.Accept(checker)
if checker.err != nil {
break
}
}
return checker.neededVars, checker.err
}

// Enter implements ast.Visitor interface.
func (*importIntoCollAssignmentChecker) Enter(node ast.Node) (ast.Node, bool) {
return node, false
}

// Leave implements ast.Visitor interface.
func (v *importIntoCollAssignmentChecker) Leave(node ast.Node) (ast.Node, bool) {
switch n := node.(type) {
case *ast.ColumnNameExpr:
v.err = errors.Errorf("COLUMN reference is not supported in IMPORT INTO column assignment, index %d", v.idx)
return n, false
case *ast.SubqueryExpr:
v.err = errors.Errorf("subquery is not supported in IMPORT INTO column assignment, index %d", v.idx)
return n, false
case *ast.VariableExpr:
if n.IsSystem {
v.err = errors.Errorf("system variable is not supported in IMPORT INTO column assignment, index %d", v.idx)
return n, false
}
if n.Value != nil {
v.err = errors.Errorf("setting a variable in IMPORT INTO column assignment is not supported, index %d", v.idx)
return n, false
}
v.neededVars[strings.ToLower(n.Name)] = v.idx
case *ast.DefaultExpr:
v.err = errors.Errorf("FUNCTION default is not supported in IMPORT INTO column assignment, index %d", v.idx)
return n, false
case *ast.WindowFuncExpr:
v.err = errors.Errorf("window FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", n.Name, v.idx)
return n, false
case *ast.AggregateFuncExpr:
v.err = errors.Errorf("aggregate FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", n.F, v.idx)
return n, false
case *ast.FuncCallExpr:
fnName := n.FnName.L
switch fnName {
case ast.Grouping:
v.err = errors.Errorf("FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", n.FnName.O, v.idx)
return n, false
case ast.GetVar:
if len(n.Args) > 0 {
val, ok := n.Args[0].(*driver.ValueExpr)
if !ok || val.Kind() != types.KindString {
v.err = errors.Errorf("the argument of getvar should be a constant string in IMPORT INTO column assignment, index %d", v.idx)
return n, false
}
v.neededVars[strings.ToLower(val.GetString())] = v.idx
}
default:
if !expression.IsFunctionSupported(fnName) {
v.err = errors.Errorf("FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", n.FnName.O, v.idx)
return n, false
}
}
case *ast.ValuesExpr:
v.err = errors.Errorf("VALUES is not supported in IMPORT INTO column assignment, index %d", v.idx)
return n, false
}
return node, v.err == nil
}

func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStmt) (base.Plan, error) {
mockTablePlan := logicalop.LogicalTableDual{}.Init(b.ctx, b.getSelectOffset())
var (
Expand Down Expand Up @@ -4358,24 +4445,18 @@ func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStm
}
options = append(options, &loadDataOpt)
}
// TODO(lance6716): check functions
neededVars := make(map[string]int)
for i, a := range ld.ColumnAssignments {
switch v := a.Expr.(type) {
case *ast.SubqueryExpr:
return nil, errors.Errorf(
"subquery is not supported in IMPORT INTO column assignment, index %d", i,
)
case *ast.VariableExpr:
neededVars[v.Name] = i
}

neededVars, err := checkImportIntoColAssignments(ld.ColumnAssignments)
if err != nil {
return nil, err
}

for _, v := range ld.ColumnsAndUserVars {
userVar := v.UserVar
if userVar == nil {
continue
}
delete(neededVars, userVar.Name)
delete(neededVars, strings.ToLower(userVar.Name))
}
if len(neededVars) > 0 {
valuesStr := make([]string, 0, len(neededVars))
Expand Down
Loading

0 comments on commit d9f0476

Please sign in to comment.