Skip to content

Commit

Permalink
dm/relay: fix intermediate meta info written to meta file (#3164)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Nov 1, 2021
1 parent a4662c1 commit 6c6ca9e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
3 changes: 3 additions & 0 deletions dm/pkg/streamer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u
case cmp > 0:
updatePathCh <- r.latestFilePath
default:
log.L().Info("newer relay uuid path is already generated",
zap.String("current path", r.latestRelayLogDir),
zap.Any("new path", switchPath))
switchCh <- *switchPath
}
return
Expand Down
24 changes: 20 additions & 4 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func (r *Relay) handleEvents(
}
}

firstEvent := true
for {
// 1. read events from upstream server
readTimer := time.Now()
Expand Down Expand Up @@ -558,6 +559,17 @@ func (r *Relay) handleEvents(
}
}

if firstEvent {
// on the first event we got, which must be a fake rotate event, save and flush meta once to make sure
// meta file exists before binlog file exists(lastPos.Name cannot be empty now)
// when switch from A to B then back to A, meta's been assigned to minCheckpoint, since it's taken as a new server.
// and meta file is not created when relay resumed.
firstEvent = false
if err2 := r.saveAndFlushMeta(lastPos, lastGTID); err2 != nil {
return 0, err2
}
}

// 3. save events into file
writeTimer := time.Now()
r.logger.Debug("writing binlog event", zap.Reflect("header", e.Header))
Expand Down Expand Up @@ -617,16 +629,20 @@ func (r *Relay) handleEvents(
if tResult.NextLogName != "" && !utils.IsFakeRotateEvent(e.Header) {
// if the binlog is rotated, we need to save and flush the next binlog filename to meta
lastPos.Name = tResult.NextLogName
if err := r.SaveMeta(lastPos, lastGTID); err != nil {
return 0, terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
if err := r.FlushMeta(); err != nil {
if err := r.saveAndFlushMeta(lastPos, lastGTID); err != nil {
return 0, err
}
}
}
}

func (r *Relay) saveAndFlushMeta(lastPos mysql.Position, lastGTID gtid.Set) error {
if err := r.SaveMeta(lastPos, lastGTID); err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v into meta", lastPos, lastGTID)
}
return r.FlushMeta()
}

// tryUpdateActiveRelayLog tries to update current active relay log file.
// we should to update after received/wrote a FormatDescriptionEvent because it means switched to a new relay log file.
// NOTE: we can refactor active (writer/read) relay log mechanism later.
Expand Down
40 changes: 36 additions & 4 deletions dm/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/DATA-DOG/go-sqlmock"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
Expand Down Expand Up @@ -421,9 +422,10 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
Timestamp: uint32(time.Now().Unix()),
ServerID: 11,
}
binlogPos = gmysql.Position{Name: "mysql-bin.666888", Pos: 4}
rotateEv, _ = event.GenRotateEvent(eventHeader, 123, []byte(binlogPos.Name), uint64(binlogPos.Pos))
queryEv, _ = event.GenQueryEvent(eventHeader, 123, 0, 0, 0, nil, nil, []byte("CREATE DATABASE db_relay_test"))
binlogPos = gmysql.Position{Name: "mysql-bin.666888", Pos: 4}
rotateEv, _ = event.GenRotateEvent(eventHeader, 123, []byte(binlogPos.Name), uint64(binlogPos.Pos))
fakeRotateEv, _ = event.GenRotateEvent(eventHeader, 0, []byte(binlogPos.Name), uint64(1234))
queryEv, _ = event.GenQueryEvent(eventHeader, 123, 0, 0, 0, nil, nil, []byte("CREATE DATABASE db_relay_test"))
)
cfg := getDBConfigForTest()
conn.InitMockDB(c)
Expand Down Expand Up @@ -453,14 +455,44 @@ func (t *testRelaySuite) TestHandleEvent(c *C) {
c.Assert(errors.Cause(handleErr), Equals, reader2.err)
}

// reader return fake rotate event
reader2.err = nil
reader2.result.Event = fakeRotateEv
// writer return error to force handleEvents return
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
// after handle rotate event, we save and flush the meta immediately
c.Assert(r.meta.Dirty(), Equals, false)
{
lm := r.meta.(*LocalMeta)
c.Assert(lm.BinLogName, Equals, "mysql-bin.666888")
c.Assert(lm.BinLogPos, Equals, uint32(1234))
filename := filepath.Join(lm.baseDir, lm.currentUUID, utils.MetaFilename)
lm2 := &LocalMeta{}
_, err2 := toml.DecodeFile(filename, lm2)
c.Assert(err2, IsNil)
c.Assert(lm2.BinLogName, Equals, "mysql-bin.666888")
c.Assert(lm2.BinLogPos, Equals, uint32(1234))
}
{
lm := r.meta.(*LocalMeta)
backupUUID := lm.currentUUID
lm.currentUUID = "not exist"
_, err = r.handleEvents(ctx, reader2, transformer2, writer2)
c.Assert(os.IsNotExist(errors.Cause(err)), Equals, true)
lm.currentUUID = backupUUID
}

// reader return valid event
reader2.err = nil
reader2.result.Event = rotateEv

// writer return error
writer2.err = errors.New("writer error for testing")
// return with the annotated writer error
_, err = r.handleEvents(ctx, reader2, transformer2, writer2)
_, err = r.handleEvents(context.Background(), reader2, transformer2, writer2)
c.Assert(errors.Cause(err), Equals, writer2.err)
// after handle rotate event, we save and flush the meta immediately
c.Assert(r.meta.Dirty(), Equals, false)
Expand Down

0 comments on commit 6c6ca9e

Please sign in to comment.