Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: shard implicit row ID #5559

Merged
merged 1 commit into from
Jan 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ const (
TableOptionDelayKeyWrite
TableOptionRowFormat
TableOptionStatsPersistent
TableOptionShardRowID
)

// RowFormat types
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlPrompt = "ddl"

shardRowIDBitsMax = 15
)

var (
Expand Down
59 changes: 57 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,24 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) {
tbInfo.Charset = op.StrValue
case ast.TableOptionCollate:
tbInfo.Collate = op.StrValue
case ast.TableOptionShardRowID:
if !hasAutoIncrementColumn(tbInfo) {
tbInfo.ShardRowIDBits = op.UintValue
if tbInfo.ShardRowIDBits > shardRowIDBitsMax {
tbInfo.ShardRowIDBits = shardRowIDBitsMax
}
}
}
}
}

func hasAutoIncrementColumn(tbInfo *model.TableInfo) bool {
for _, col := range tbInfo.Columns {
if mysql.HasAutoIncrementFlag(col.Flag) {
return true
}
}
return false
}

func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
Expand Down Expand Up @@ -871,9 +887,17 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte
err = ErrUnsupportedModifyPrimaryKey.GenByArgs("drop")
case ast.AlterTableOption:
for _, opt := range spec.Options {
if opt.Tp == ast.TableOptionAutoIncrement {
switch opt.Tp {
case ast.TableOptionShardRowID:
if opt.UintValue > shardRowIDBitsMax {
opt.UintValue = shardRowIDBitsMax
}
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue))
break
}
if err != nil {
return errors.Trace(err)
}
}
default:
Expand Down Expand Up @@ -915,6 +939,37 @@ func (d *ddl) RebaseAutoID(ctx context.Context, ident ast.Ident, newBase int64)
return errors.Trace(err)
}

// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits.
func (d *ddl) ShardRowID(ctx context.Context, tableIdent ast.Ident, uVal uint64) error {
job, err := d.createJobForTable(tableIdent)
if err != nil {
return errors.Trace(err)
}
job.Type = model.ActionShardRowID
job.Args = []interface{}{uVal}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) createJobForTable(tableIdent ast.Ident) (*model.Job, error) {
is := d.GetInformationSchema()
schema, ok := is.SchemaByName(tableIdent.Schema)
if !ok {
return nil, infoschema.ErrDatabaseNotExists.GenByArgs(tableIdent.Schema)
}
t, err := is.TableByName(tableIdent.Schema, tableIdent.Name)
if err != nil {
return nil, infoschema.ErrTableNotExists.GenByArgs(tableIdent.Schema, tableIdent.Name)
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
BinlogInfo: &model.HistoryInfo{},
}
return job, nil
}

func checkColumnConstraint(constraints []*ast.ColumnOption) error {
for _, constraint := range constraints {
switch constraint.Tp {
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
tmysql "github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -66,6 +67,7 @@ type testDBSuite struct {
tk *testkit.TestKit
s tidb.Session
lease time.Duration
autoIDStep int64
}

func (s *testDBSuite) SetUpSuite(c *C) {
Expand All @@ -77,6 +79,8 @@ func (s *testDBSuite) SetUpSuite(c *C) {
tidb.SetSchemaLease(s.lease)
tidb.SetStatsLease(0)
s.schemaName = "test_db"
s.autoIDStep = autoid.GetStep()
autoid.SetStep(5000)

s.store, err = tikv.NewMockTikvStore()
c.Assert(err, IsNil)
Expand All @@ -99,6 +103,7 @@ func (s *testDBSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
autoid.SetStep(s.autoIDStep)
}

func (s *testDBSuite) testErrorCode(c *C, sql string, errCode int) {
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
ver, err = d.onRenameTable(t, job)
case model.ActionSetDefaultValue:
ver, err = d.onSetDefaultValue(t, job)
case model.ActionShardRowID:
ver, err = d.onShardRowID(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobCancelled
Expand Down
23 changes: 23 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,29 @@ func (d *ddl) onRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, nil
}

func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var shardRowIDBits uint64
err := job.DecodeArgs(&shardRowIDBits)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
tblInfo.ShardRowIDBits = shardRowIDBits
job.State = model.JobCancelled
job.BinlogInfo.AddTableInfo(ver, tblInfo)
ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
return ver, nil
}

func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var oldSchemaID int64
var tableName model.CIStr
Expand Down
28 changes: 28 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/types"
)
Expand Down Expand Up @@ -283,3 +284,30 @@ func (s *testSuite) TestUnsupportedCharset(c *C) {
}
tk.MustExec("drop database " + dbName)
}

func (s *testSuite) TestShardRowIDBits(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("create table t (a int) shard_row_id_bits = 15")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert t values (%d)", i))
}
tbl, err := sessionctx.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
var hasShardedID bool
var count int
c.Assert(tk.Se.NewTxn(), IsNil)
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) {
c.Assert(h, GreaterEqual, int64(0))
first8bits := h >> 56
if first8bits > 0 {
hasShardedID = true
}
count++
return true, nil
})
c.Assert(err, IsNil)
c.Assert(count, Equals, 100)
c.Assert(hasShardedID, IsTrue)
}
6 changes: 6 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/parser"
Expand Down Expand Up @@ -65,11 +66,15 @@ type testSuite struct {
mvccStore *mocktikv.MvccStore
store kv.Storage
*parser.Parser

autoIDStep int64
}

var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in executor test")

func (s *testSuite) SetUpSuite(c *C) {
s.autoIDStep = autoid.GetStep()
autoid.SetStep(5000)
s.Parser = parser.New()
flag.Lookup("mockTikv")
useMockTikv := *mockTikv
Expand All @@ -96,6 +101,7 @@ func (s *testSuite) SetUpSuite(c *C) {

func (s *testSuite) TearDownSuite(c *C) {
s.store.Close()
autoid.SetStep(s.autoIDStep)
}

func (s *testSuite) SetUpTest(c *C) {
Expand Down
4 changes: 4 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,10 @@ func (e *ShowExec) fetchShowCreateTable() error {
}
}

if tb.Meta().ShardRowIDBits > 0 {
buf.WriteString(fmt.Sprintf("/*!90000 SHARD_ROW_ID_BITS=%d */", tb.Meta().ShardRowIDBits))
}

if len(tb.Meta().Comment) > 0 {
buf.WriteString(fmt.Sprintf(" COMMENT='%s'", format.OutputFormat(tb.Meta().Comment)))
}
Expand Down
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(row []types.Datum, i int, c *tab
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if row[i].IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
recordID, err = e.Table.AllocAutoID()
recordID, err = e.Table.AllocAutoID(e.ctx)
if e.filterErr(errors.Trace(err), ignoreErr) != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ func (it *infoschemaTable) UpdateRecord(ctx context.Context, h int64, oldData, n
return table.ErrUnsupportedOp
}

func (it *infoschemaTable) AllocAutoID() (int64, error) {
func (it *infoschemaTable) AllocAutoID(ctx context.Context) (int64, error) {
return 0, table.ErrUnsupportedOp
}

Expand Down
7 changes: 6 additions & 1 deletion meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// Test needs to change it, so it's a variable.
var step = int64(5000)
var step = int64(30000)

var errInvalidTableID = terror.ClassAutoid.New(codeInvalidTableID, "invalid TableID")

Expand Down Expand Up @@ -59,6 +59,11 @@ func GetStep() int64 {
return step
}

// SetStep is only used by tests
func SetStep(s int64) {
step = s
}

// NextGlobalAutoID implements autoid.Allocator NextGlobalAutoID interface.
func (alloc *allocator) NextGlobalAutoID(tableID int64) (int64, error) {
var autoID int64
Expand Down
3 changes: 3 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
ActionRebaseAutoID
ActionRenameTable
ActionSetDefaultValue
ActionShardRowID
)

func (action ActionType) String() string {
Expand Down Expand Up @@ -77,6 +78,8 @@ func (action ActionType) String() string {
return "rename table"
case ActionSetDefaultValue:
return "set default value"
case ActionShardRowID:
return "shard row ID"
default:
return "none"
}
Expand Down
3 changes: 3 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type TableInfo struct {
// TODO: Remove it.
// Now it only uses for compatibility with the old version that already uses this field.
OldSchemaID int64 `json:"old_schema_id,omitempty"`

// ShardRowIDBits specify if the implicit row ID is sharded.
ShardRowIDBits uint64
}

// GetDBID returns the schema ID that is used to create an allocator.
Expand Down
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ var tokenMap = map[string]int{
"QUARTER": quarter,
"QUERY": query,
"QUICK": quick,
"SHARD_ROW_ID_BITS": shardRowIDBits,
"RANGE": rangeKwd,
"READ": read,
"REAL": realType,
Expand Down
5 changes: 5 additions & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ import (
precisionType "PRECISION"
primary "PRIMARY"
procedure "PROCEDURE"
shardRowIDBits "SHARD_ROW_ID_BITS"
rangeKwd "RANGE"
read "READ"
realType "REAL"
Expand Down Expand Up @@ -5007,6 +5008,10 @@ TableOption:
{
$$ = &ast.TableOption{Tp: ast.TableOptionStatsPersistent}
}
| "SHARD_ROW_ID_BITS" EqOpt LengthNum
{
$$ = &ast.TableOption{Tp: ast.TableOptionShardRowID, UintValue: $3.(uint64)}
}

StatsPersistentVal:
"DEFAULT"
Expand Down
2 changes: 2 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,7 @@ func (s *testParserSuite) TestDDL(c *C) {
{"create table t (a timestamp default now())", true},
{"create table t (a timestamp default now() on update now)", false},
{"create table t (a timestamp default now() on update now())", true},
{"CREATE TABLE t (c TEXT) shard_row_id_bits = 1;", true},
// Create table with ON UPDATE CURRENT_TIMESTAMP(6), specify fraction part.
{"CREATE TABLE IF NOT EXISTS `general_log` (`event_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),`user_host` mediumtext NOT NULL,`thread_id` bigint(20) unsigned NOT NULL,`server_id` int(10) unsigned NOT NULL,`command_type` varchar(64) NOT NULL,`argument` mediumblob NOT NULL) ENGINE=CSV DEFAULT CHARSET=utf8 COMMENT='General log'", true},

Expand Down Expand Up @@ -1462,6 +1463,7 @@ func (s *testParserSuite) TestDDL(c *C) {
{"ALTER TABLE t ADD UNIQUE (a) COMMENT 'a'", true},
{"ALTER TABLE t ADD UNIQUE KEY (a) COMMENT 'a'", true},
{"ALTER TABLE t ADD UNIQUE INDEX (a) COMMENT 'a'", true},
{"ALTER TABLE t shard_row_id_bits = 1", true},

// For create index statement
{"CREATE INDEX idx ON t (a)", true},
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package binloginfo

import (
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -98,6 +100,7 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery
if client == nil {
return
}
ddlQuery = addSpecialComment(ddlQuery)
info := &BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
Expand All @@ -108,3 +111,19 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery
}
txn.SetOption(kv.BinlogInfo, info)
}

const specialPrefix = `/*!90000 `

func addSpecialComment(ddlQuery string) string {
if strings.Contains(ddlQuery, specialPrefix) {
return ddlQuery
}
upperQuery := strings.ToUpper(ddlQuery)
reg, err := regexp.Compile(`SHARD_ROW_ID_BITS\s*=\s*\d+`)
terror.Log(err)
loc := reg.FindStringIndex(upperQuery)
if len(loc) < 2 {
return ddlQuery
}
return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:]
}
Loading