Skip to content

Commit

Permalink
mounter(ticdc): calculate raw bytes checksum by using handle (#11720) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 8, 2024
1 parent 8532fab commit ad58c8b
Show file tree
Hide file tree
Showing 23 changed files with 193 additions and 173 deletions.
80 changes: 41 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,20 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}
baseInfo.RecordID = recordID

rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, recordID, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,28 +237,21 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra

func (m *mounter) unmarshalRowKVEntry(
tableInfo *model.TableInfo,
rawKey []byte,
rawValue []byte,
rawOldValue []byte,
base baseKVEntry,
) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
}
base.RecordID = recordID

var (
row, preRow map[int64]types.Datum
rowExist, preRowExist bool
)

row, rowExist, err = m.decodeRow(rawValue, recordID, tableInfo, false)
row, rowExist, err := m.decodeRow(rawValue, base.RecordID, tableInfo, false)
if err != nil {
return nil, errors.Trace(err)
}

preRow, preRowExist, err = m.decodeRow(rawOldValue, recordID, tableInfo, true)
preRow, preRowExist, err = m.decodeRow(rawOldValue, base.RecordID, tableInfo, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -490,33 +489,34 @@ func (m *mounter) verifyColumnChecksum(

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))
return 0, false, err
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -602,7 +602,7 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
handle kv.Handle, key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
Expand All @@ -621,12 +621,14 @@ func verifyRawBytesChecksum(
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, handle, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
Expand All @@ -635,7 +637,10 @@ func verifyRawBytesChecksum(
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))

return expected, false, nil
}
Expand All @@ -645,7 +650,7 @@ func verifyRawBytesChecksum(
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
handle kv.Handle, key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand All @@ -665,17 +670,22 @@ func (m *mounter) verifyChecksum(
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
case 1, 2:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand All @@ -685,7 +695,7 @@ func (m *mounter) verifyChecksum(
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
tableInfo *model.TableInfo, row *rowKVEntry, handle kv.Handle, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
Expand Down Expand Up @@ -719,19 +729,15 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, handle, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -751,18 +757,14 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, handle, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down
5 changes: 3 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/executor"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -1553,7 +1554,7 @@ func TestBuildTableInfo(t *testing.T) {
for i, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
originTI, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
Expand Down Expand Up @@ -1623,7 +1624,7 @@ func TestNewDMRowChange(t *testing.T) {
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
originTI, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error {
}

// GetSchemaVersion returns the schema version of the meta.
func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
func GetSchemaVersion(meta timeta.Reader) (int64, error) {
// After we get the schema version at startTs, if the diff corresponding to that version does not exist,
// it means that the job is not committed yet, so we should subtract one from the version, i.e., version--.
version, err := meta.GetSchemaVersion()
Expand All @@ -130,7 +130,7 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
meta timeta.Reader,
currentTs uint64,
forceReplicate bool,
filter filter.Filter,
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func getAllHistoryDDLJob(storage tidbkv.Storage, f filter.Filter) ([]*timodel.Jo
return nil, errors.Trace(err)
}
defer txn.Rollback() //nolint:errcheck
txnMeta := timeta.NewMeta(txn)
txnMeta := timeta.NewReader(txn)

jobs, err := ddl.GetAllHistoryDDLJobs(txnMeta)
res := make([]*timodel.Job, 0)
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func TestAllTables(t *testing.T) {
require.Len(t, tableInfos, 2)
tableName := tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 108,
Schema: job.SchemaName,
Table: job.TableName,
TableID: job.TableID,
}, tableName)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
Expand All @@ -148,9 +148,9 @@ func TestAllTables(t *testing.T) {
require.Len(t, tableInfos, 2)
tableName = tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Schema: job.SchemaName,
Table: "t1",
TableID: 108,
TableID: 112,
}, tableName)
}

Expand Down
13 changes: 8 additions & 5 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func NewSchemaTestHelper(t testing.TB) *SchemaTestHelper {
// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
reader := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(reader.(*timeta.Mutator), 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
Expand Down Expand Up @@ -151,7 +152,8 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), jobCnt)
reader := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(reader.(*timeta.Mutator), jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
// Set State from Synced to Done.
Expand Down Expand Up @@ -209,7 +211,8 @@ func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
// DDL2Event executes the DDL and return the corresponding event.
func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
meta := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(meta.(*timeta.Mutator), 1)
require.NoError(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
Expand Down Expand Up @@ -267,10 +270,10 @@ func (s *SchemaTestHelper) Tk() *testkit.TestKit {
}

// GetCurrentMeta return the current meta snapshot
func (s *SchemaTestHelper) GetCurrentMeta() *timeta.Meta {
func (s *SchemaTestHelper) GetCurrentMeta() timeta.Reader {
ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(s.t, err)
return timeta.NewSnapshotMeta(s.storage.GetSnapshot(ver))
return timeta.NewReader(s.storage.GetSnapshot(ver))
}

// Close closes the helper
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
)

// GetSnapshotMeta returns tidb meta information
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) *meta.Meta {
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) meta.Reader {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot)
return meta.NewReader(snapshot)
}

// CreateTiStore creates a tikv storage client
Expand Down
14 changes: 2 additions & 12 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -99,7 +99,7 @@ func newMockDDLJobPuller(
if needSchemaStorage {
helper = entry.NewSchemaTestHelper(t)
kvStorage := helper.Storage()
ts := helper.GetCurrentMeta().StartTS
ts := helper.GetCurrentMeta().(*meta.Mutator).StartTS
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
schemaStorage, err := entry.NewSchemaStorage(
Expand Down Expand Up @@ -532,16 +532,6 @@ func TestHandleJob(t *testing.T) {
job := &timodel.Job{
Type: timodel.ActionFlashbackCluster,
BinlogInfo: &timodel.HistoryInfo{},
Args: []interface{}{
998,
map[string]interface{}{},
true, /* tidb_gc_enable */
variable.On, /* tidb_enable_auto_analyze */
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0, /* commitTS */
},
}
skip, err := ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit ad58c8b

Please sign in to comment.