Skip to content

Commit

Permalink
planner: refactor some code related to binding (#59883)
Browse files Browse the repository at this point in the history
ref #51347
  • Loading branch information
qw4990 authored Mar 4, 2025
1 parent b500d9e commit 6b4d85b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 151 deletions.
2 changes: 1 addition & 1 deletion pkg/bindinfo/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode, info *Bindi
if globalHandle == nil {
return
}
binding, matched = globalHandle.MatchGlobalBinding(sctx, noDBDigest, tableNames)
binding, matched = globalHandle.MatchingBinding(sctx, noDBDigest, tableNames)
if matched {
return binding, matched, metrics.ScopeGlobal
}
Expand Down
113 changes: 113 additions & 0 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,128 @@
package bindinfo

import (
"fmt"
"sync"
"sync/atomic"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

// BindingCacheUpdater maintains the binding cache and provide update APIs.
type BindingCacheUpdater interface {
BindingCache

// LoadFromStorageToCache loads global bindings from storage to the memory cache.
LoadFromStorageToCache(fullLoad bool) (err error)

// LastUpdateTime returns the last update time.
LastUpdateTime() types.Time
}

type bindingCacheUpdater struct {
BindingCache

sPool util.DestroyableSessionPool

// lastTaskTime records the last update time for the global sql bind cache.
// This value is used to avoid reload duplicated bindings from storage.
lastUpdateTime atomic.Value
}

// LoadFromStorageToCache loads bindings from the storage into the cache.
func (u *bindingCacheUpdater) LoadFromStorageToCache(fullLoad bool) (err error) {
var lastUpdateTime types.Time
var timeCondition string
if fullLoad {
lastUpdateTime = types.ZeroTimestamp
timeCondition = ""
} else {
lastUpdateTime = u.lastUpdateTime.Load().(types.Time)
timeCondition = fmt.Sprintf("WHERE update_time>'%s'", lastUpdateTime.String())
}
condition := fmt.Sprintf(`%s ORDER BY update_time, create_time`, timeCondition)
bindings, err := u.readBindingsFromStorage(condition)
if err != nil {
return err
}

for _, binding := range bindings {
// Update lastUpdateTime to the newest one.
// Even if this one is an invalid bind.
if binding.UpdateTime.Compare(lastUpdateTime) > 0 {
lastUpdateTime = binding.UpdateTime
}

oldBinding := u.GetBinding(binding.SQLDigest)
cachedBinding := pickCachedBinding(oldBinding, binding)
if cachedBinding != nil {
err = u.SetBinding(binding.SQLDigest, cachedBinding)
if err != nil {
bindingLogger().Warn("BindingHandle.Update", zap.Error(err))
}
} else {
u.RemoveBinding(binding.SQLDigest)
}
}

// update last-update-time and metrics
u.lastUpdateTime.Store(lastUpdateTime)
metrics.BindingCacheMemUsage.Set(float64(u.GetMemUsage()))
metrics.BindingCacheMemLimit.Set(float64(u.GetMemCapacity()))
metrics.BindingCacheNumBindings.Set(float64(len(u.GetAllBindings())))
return nil
}

// LastUpdateTime returns the last update time.
func (u *bindingCacheUpdater) LastUpdateTime() types.Time {
return u.lastUpdateTime.Load().(types.Time)
}

func (u *bindingCacheUpdater) readBindingsFromStorage(condition string) (bindings []*Binding, err error) {
selectStmt := fmt.Sprintf(`SELECT original_sql, bind_sql, default_db, status, create_time,
update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info
%s`, condition)

err = callWithSCtx(u.sPool, false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
bindings = make([]*Binding, 0, len(rows))
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
binding := newBindingFromStorage(row)
if hErr := prepareHints(sctx, binding); hErr != nil {
bindingLogger().Warn("failed to generate bind record from data row", zap.Error(hErr))
continue
}
bindings = append(bindings, binding)
}
return nil
})
return
}

// NewBindingCacheUpdater creates a new BindingCacheUpdater.
func NewBindingCacheUpdater(sPool util.DestroyableSessionPool) BindingCacheUpdater {
u := new(bindingCacheUpdater)
u.lastUpdateTime.Store(types.ZeroTimestamp)
u.sPool = sPool
u.BindingCache = newBindCache()
return u
}

// digestBiMap represents a bidirectional map between noDBDigest and sqlDigest, used to support cross-db binding.
// One noDBDigest can map to multiple sqlDigests, but one sqlDigest can only map to one noDBDigest.
type digestBiMap interface {
Expand Down
135 changes: 7 additions & 128 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package bindinfo
import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand All @@ -38,13 +36,7 @@ import (

// GlobalBindingHandle is used to handle all global sql bind operations.
type GlobalBindingHandle interface {
// Methods for create, get, drop global sql bindings.

// MatchGlobalBinding returns the matched binding for this statement.
MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool)

// GetAllGlobalBindings returns all bind records in cache.
GetAllGlobalBindings() (bindings []*Binding)
BindingCacheUpdater

// CreateGlobalBinding creates a Bindings to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
Expand All @@ -56,36 +48,16 @@ type GlobalBindingHandle interface {
// SetGlobalBindingStatus set a Bindings's status to the storage and bind cache.
SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error)

// Methods for load and clear global sql bindings.

// LoadFromStorageToCache loads global bindings from storage to the memory cache.
LoadFromStorageToCache(fullLoad bool) (err error)

// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
GCGlobalBinding() (err error)

// Methods for memory control.

// GetMemUsage returns the memory usage for the bind cache.
GetMemUsage() (memUsage int64)

// GetMemCapacity returns the memory capacity for the bind cache.
GetMemCapacity() (memCapacity int64)

// Close closes the binding handler.
Close()

variable.Statistics
}

// globalBindingHandle is used to handle all global sql bind operations.
type globalBindingHandle struct {
sPool util.DestroyableSessionPool
bindingCache BindingCache

// lastTaskTime records the last update time for the global sql bind cache.
// This value is used to avoid reload duplicated bindings from storage.
lastUpdateTime atomic.Value
BindingCacheUpdater
sPool util.DestroyableSessionPool
}

// Lease influences the duration of loading bind info and handling invalid bind.
Expand Down Expand Up @@ -113,85 +85,12 @@ const (

// NewGlobalBindingHandle creates a new GlobalBindingHandle.
func NewGlobalBindingHandle(sPool util.DestroyableSessionPool) GlobalBindingHandle {
h := &globalBindingHandle{sPool: sPool}
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.bindingCache = newBindCache()
cache := NewBindingCacheUpdater(sPool)
h := &globalBindingHandle{sPool: sPool, BindingCacheUpdater: cache}
variable.RegisterStatistics(h)
return h
}

// LoadFromStorageToCache loads bindings from the storage into the cache.
func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error) {
var lastUpdateTime types.Time
var timeCondition string
if fullLoad {
lastUpdateTime = types.ZeroTimestamp
timeCondition = ""
} else {
lastUpdateTime = h.lastUpdateTime.Load().(types.Time)
timeCondition = fmt.Sprintf("WHERE update_time>'%s'", lastUpdateTime.String())
}
condition := fmt.Sprintf(`%s ORDER BY update_time, create_time`, timeCondition)
bindings, err := h.readBindingsFromStorage(condition)
if err != nil {
return err
}

for _, binding := range bindings {
// Update lastUpdateTime to the newest one.
// Even if this one is an invalid bind.
if binding.UpdateTime.Compare(lastUpdateTime) > 0 {
lastUpdateTime = binding.UpdateTime
}

oldBinding := h.bindingCache.GetBinding(binding.SQLDigest)
cachedBinding := pickCachedBinding(oldBinding, binding)
if cachedBinding != nil {
err = h.bindingCache.SetBinding(binding.SQLDigest, cachedBinding)
if err != nil {
bindingLogger().Warn("BindingHandle.Update", zap.Error(err))
}
} else {
h.bindingCache.RemoveBinding(binding.SQLDigest)
}
}

// update last-update-time and metrics
h.lastUpdateTime.Store(lastUpdateTime)
metrics.BindingCacheMemUsage.Set(float64(h.GetMemUsage()))
metrics.BindingCacheMemLimit.Set(float64(h.GetMemCapacity()))
metrics.BindingCacheNumBindings.Set(float64(len(h.bindingCache.GetAllBindings())))
return nil
}

func (h *globalBindingHandle) readBindingsFromStorage(condition string) (bindings []*Binding, err error) {
selectStmt := fmt.Sprintf(`SELECT original_sql, bind_sql, default_db, status, create_time,
update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info
%s`, condition)

err = callWithSCtx(h.sPool, false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
bindings = make([]*Binding, 0, len(rows))
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
binding := newBindingFromStorage(row)
if hErr := prepareHints(sctx, binding); hErr != nil {
bindingLogger().Warn("failed to generate bind record from data row", zap.Error(hErr))
continue
}
bindings = append(bindings, binding)
}
return nil
})
return
}

// CreateGlobalBinding creates a Bindings to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, bindings []*Binding) (err error) {
Expand Down Expand Up @@ -370,26 +269,6 @@ func lockBindInfoTable(sctx sessionctx.Context) error {
return err
}

// MatchGlobalBinding returns the matched binding for this statement.
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool) {
return h.bindingCache.MatchingBinding(sctx, noDBDigest, tableNames)
}

// GetAllGlobalBindings returns all bind records in cache.
func (h *globalBindingHandle) GetAllGlobalBindings() (bindings []*Binding) {
return h.bindingCache.GetAllBindings()
}

// GetMemUsage returns the memory usage for the bind cache.
func (h *globalBindingHandle) GetMemUsage() (memUsage int64) {
return h.bindingCache.GetMemUsage()
}

// GetMemCapacity returns the memory capacity for the bind cache.
func (h *globalBindingHandle) GetMemCapacity() (memCapacity int64) {
return h.bindingCache.GetMemCapacity()
}

// newBindingFromStorage builds Bindings from a tuple in storage.
func newBindingFromStorage(row chunk.Row) *Binding {
status := row.GetString(3)
Expand Down Expand Up @@ -482,12 +361,12 @@ func (*globalBindingHandle) GetScope(_ string) vardef.ScopeFlag {
// Stats returns the server statistics.
func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]any, error) {
m := make(map[string]any)
m[lastPlanBindingUpdateTime] = h.lastUpdateTime.Load().(types.Time).String()
m[lastPlanBindingUpdateTime] = h.LastUpdateTime().String()
return m, nil
}

// Close closes the binding handler.
func (h *globalBindingHandle) Close() {
h.bindingCache.Close()
h.BindingCacheUpdater.Close()
h.sPool.Close()
}
Loading

0 comments on commit 6b4d85b

Please sign in to comment.