Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default value when generate the update rows #1006

Merged
merged 11 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbiter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Arbiter

The complete import process is as follows:

1. Read Binlog from Kafka in the format of [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/slave_binlog_proto/proto/binlog.proto).
1. Read Binlog from Kafka in the format of [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/proto/proto/binlog.proto).
2. While reaching a limit data size, construct the SQL according the Binlog and write to downstream concurrently(notice: Arbiter will split the upstream transaction).
3. Save the checkpoint.

Expand Down
2 changes: 1 addition & 1 deletion arbiter/README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Arbiter

整体工作原理如下:

1. 读取 Kafka 的 [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/slave_binlog_proto/proto/binlog.proto) 格式 Binlog 。
1. 读取 Kafka 的 [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/proto/proto/binlog.proto) 格式 Binlog 。
2. 达到一定数据量后 根据 Binlog 构造对应 SQL 并发写入下游(注意 Arbiter 会拆分上游事务)。
3. 保存 checkpoint 。

Expand Down
2 changes: 1 addition & 1 deletion arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-tools/tidb-binlog/driver/reader"
pb "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
pb "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
)

type dummyLoader struct {
Expand Down
2 changes: 1 addition & 1 deletion binlogctl/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/siddontang/go/ioutil2"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion binlogctl/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"strings"

. "github.com/pingcap/check"
pd "github.com/pingcap/pd/v4/client"
pd "github.com/tikv/pd/client"
)

type metaSuite struct{}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/loader"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion drainer/relay/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
"github.com/pingcap/tipb/go-binlog"
)

Expand Down
80 changes: 67 additions & 13 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ type Schema struct {
schemaNameToID map[string]int64

schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo
tables map[int64][]schemaVersionTableInfo

truncateTableID map[int64]struct{}
tblsDroppingCol map[int64]bool

tableSchemaVersion map[int64]int64

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -51,20 +53,26 @@ type Schema struct {
// TableName stores the table and schema name
type TableName = filter.TableName

type schemaVersionTableInfo struct {
SchemaVersion int64
TableInfo *model.TableInfo
}

// NewSchema returns the Schema object
func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
tblsDroppingCol: make(map[int64]bool),
tableSchemaVersion: make(map[int64]int64),
jobs: jobs,
}

s.tableIDToName = make(map[int64]TableName)
s.schemas = make(map[int64]*model.DBInfo)
s.schemaNameToID = make(map[string]int64)
s.tables = make(map[int64]*model.TableInfo)
s.tables = make(map[int64][]schemaVersionTableInfo)

return s, nil
}
Expand Down Expand Up @@ -120,8 +128,11 @@ func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool) {

// TableByID returns the TableInfo by table id
func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool) {
val, ok = s.tables[id]
return
tbls := s.tables[id]
if len(tbls) == 0 {
return nil, false
}
return tbls[len(tbls)-1].TableInfo, true
}

// DropSchema deletes the given DBInfo
Expand Down Expand Up @@ -157,10 +168,11 @@ func (s *Schema) CreateSchema(db *model.DBInfo) error {

// DropTable deletes the given TableInfo
func (s *Schema) DropTable(id int64) (string, error) {
table, ok := s.tables[id]
tables, ok := s.tables[id]
if !ok {
return "", errors.NotFoundf("table %d", id)
}
table := tables[len(tables)-1].TableInfo
err := s.removeTable(id)
if err != nil {
return "", errors.Trace(err)
Expand All @@ -173,8 +185,33 @@ func (s *Schema) DropTable(id int64) (string, error) {
return table.Name.O, nil
}

func (s *Schema) appendTableInfo(schemaVersion int64, table *model.TableInfo) {
tbls := s.tables[table.ID]
tbls = append(tbls, schemaVersionTableInfo{SchemaVersion: schemaVersion, TableInfo: table})
if len(tbls) > 2 {
tbls = tbls[len(tbls)-2:]
}
s.tables[table.ID] = tbls
}

// TableBySchemaVersion get the table info according the schemaVersion and table id.
func (s *Schema) TableBySchemaVersion(id int64, schemaVersion int64) (table *model.TableInfo, ok bool) {
tbls, ok := s.tables[id]
if !ok {
return nil, false
}

for _, t := range tbls {
if t.SchemaVersion >= schemaVersion {
return t.TableInfo, true
}
}

return nil, false
}

// CreateTable creates new TableInfo
func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error {
func (s *Schema) CreateTable(schemaVersion int64, schema *model.DBInfo, table *model.TableInfo) error {
_, ok := s.tables[table.ID]
if ok {
return errors.AlreadyExistsf("table %s.%s", schema.Name, table.Name)
Expand All @@ -185,15 +222,15 @@ func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error
}

schema.Tables = append(schema.Tables, table)
s.tables[table.ID] = table
s.appendTableInfo(schemaVersion, table)
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}

log.Debug("create table success", zap.String("name", schema.Name.O+"."+table.Name.O), zap.Int64("id", table.ID))
return nil
}

// ReplaceTable replace the table by new tableInfo
func (s *Schema) ReplaceTable(table *model.TableInfo) error {
func (s *Schema) ReplaceTable(schemaVersion int64, table *model.TableInfo) error {
_, ok := s.tables[table.ID]
if !ok {
return errors.NotFoundf("table %s(%d)", table.Name, table.ID)
Expand All @@ -203,7 +240,7 @@ func (s *Schema) ReplaceTable(table *model.TableInfo) error {
addImplicitColumn(table)
}

s.tables[table.ID] = table
s.appendTableInfo(schemaVersion, table)

return nil
}
Expand Down Expand Up @@ -260,6 +297,8 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}

s.tableSchemaVersion[job.TableID] = job.BinlogInfo.SchemaVersion
}

s.jobs = s.jobs[i:]
Expand Down Expand Up @@ -349,7 +388,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err = s.CreateTable(schema, table)
err = s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -370,7 +409,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err := s.CreateTable(schema, table)
err := s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand Down Expand Up @@ -412,7 +451,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("table %d", job.TableID)
}

err = s.CreateTable(schema, table)
err = s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -437,7 +476,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err := s.ReplaceTable(tbInfo)
err := s.ReplaceTable(job.BinlogInfo.SchemaVersion, tbInfo)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -456,6 +495,21 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return
}

// CanAppendDefaultValue means we can safely add the default value to the column if missing the value.
func (s *Schema) CanAppendDefaultValue(id int64, schemaVersion int64) bool {
if s.IsDroppingColumn(id) {
return true
}

if v, ok := s.tableSchemaVersion[id]; ok {
if schemaVersion < v {
return true
}
}

return false
}

// IsDroppingColumn returns true if the table is in the middle of dropping a column
func (s *Schema) IsDroppingColumn(id int64) bool {
return s.tblsDroppingCol[id]
Expand Down
2 changes: 1 addition & 1 deletion drainer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

"github.com/gorilla/mux"
. "github.com/pingcap/check"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/tidb-binlog/pkg/etcd"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/util"
pd "github.com/tikv/pd/client"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/bench_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"testing"

"github.com/gogo/protobuf/proto"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
ti "github.com/pingcap/tipb/go-binlog"
)

Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/util"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
"go.uber.org/zap"
)

Expand Down
Loading