diff --git a/src/cli/callx/callx.go b/src/cli/callx/callx.go index 0826670..a0749fb 100644 --- a/src/cli/callx/callx.go +++ b/src/cli/callx/callx.go @@ -645,6 +645,36 @@ func RaftDisablePurgeBinlogRPC(node string) error { return err } +func RaftEnableCheckSemiSyncRPC(node string) error { + cli, cleanup, err := GetClient(node) + if err != nil { + return err + } + defer cleanup() + + method := model.RPCRaftEnableCheckSemiSync + req := model.NewRaftStatusRPCRequest() + rsp := model.NewRaftStatusRPCResponse(model.OK) + err = cli.Call(method, req, rsp) + + return err +} + +func RaftDisableCheckSemiSyncRPC(node string) error { + cli, cleanup, err := GetClient(node) + if err != nil { + return err + } + defer cleanup() + + method := model.RPCRaftDisableCheckSemiSync + req := model.NewRaftStatusRPCRequest() + rsp := model.NewRaftStatusRPCResponse(model.OK) + err = cli.Call(method, req, rsp) + + return err +} + // mysql func WaitMysqlWorkingRPC(node string) error { cli, cleanup, err := GetClient(node) diff --git a/src/cli/cmd/raft.go b/src/cli/cmd/raft.go index 99e6522..66c75d8 100644 --- a/src/cli/cmd/raft.go +++ b/src/cli/cmd/raft.go @@ -32,6 +32,8 @@ func NewRaftCommand() *cobra.Command { cmd.AddCommand(NewRaftStatusCommand()) cmd.AddCommand(NewRaftEnablePurgeBinlogCommand()) cmd.AddCommand(NewRaftDisablePurgeBinlogCommand()) + cmd.AddCommand(NewRaftEnableCheckSemiSyncCommand()) + cmd.AddCommand(NewRaftDisableCheckSemiSyncCommand()) return cmd } @@ -300,3 +302,57 @@ func raftDisablePurgeBinlogCommandFn(cmd *cobra.Command, args []string) { log.Warning("[%v].disable.purge.binlog.done", self) } } + +func NewRaftEnableCheckSemiSyncCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "enablechecksemisync", + Short: "enable leader to check semi-sync(default)", + Run: raftEnableCheckSemiSyncCommandFn, + } + + return cmd +} + +func raftEnableCheckSemiSyncCommandFn(cmd *cobra.Command, args []string) { + if len(args) > 0 { + ErrorOK(fmt.Errorf("too.many.args")) + } + + // send enable + { + conf, err := GetConfig() + ErrorOK(err) + self := conf.Server.Endpoint + log.Warning("[%v].prepare.to.enable.check.semi-sync", self) + err = callx.RaftEnableCheckSemiSyncRPC(self) + ErrorOK(err) + log.Warning("[%v].enable.check.semi-sync.done", self) + } +} + +func NewRaftDisableCheckSemiSyncCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "disablechecksemisync", + Short: "disable leader to check semi-sync", + Run: raftDisableCheckSemiSyncCommandFn, + } + + return cmd +} + +func raftDisableCheckSemiSyncCommandFn(cmd *cobra.Command, args []string) { + if len(args) > 0 { + ErrorOK(fmt.Errorf("too.many.args")) + } + + // send enable + { + conf, err := GetConfig() + ErrorOK(err) + self := conf.Server.Endpoint + log.Warning("[%v].prepare.to.disable.check.semi-sync", self) + err = callx.RaftDisableCheckSemiSyncRPC(self) + ErrorOK(err) + log.Warning("[%v].disable.check.semi-sync.done", self) + } +} diff --git a/src/cli/cmd/raft_test.go b/src/cli/cmd/raft_test.go index 4d1b36f..3ac686b 100644 --- a/src/cli/cmd/raft_test.go +++ b/src/cli/cmd/raft_test.go @@ -153,5 +153,19 @@ func TestCLIRaftCommand(t *testing.T) { _, err := executeCommand(cmd, "enablepurgebinlog") assert.Nil(t, err) } + + // disable check semi-sync + { + cmd := NewRaftCommand() + _, err := executeCommand(cmd, "disablechecksemisync") + assert.Nil(t, err) + } + + // enable check semi-sync + { + cmd := NewRaftCommand() + _, err := executeCommand(cmd, "enablechecksemisync") + assert.Nil(t, err) + } } } diff --git a/src/model/raft.go b/src/model/raft.go index ad06b86..e5906eb 100644 --- a/src/model/raft.go +++ b/src/model/raft.go @@ -15,12 +15,14 @@ const ( RAFTMYSQL_WAITUNTILAFTERGTID RAFTMYSQL_STATUS = "WaitUntilAfterGTID" ) const ( - RPCRaftPing = "RaftRPC.Ping" - RPCRaftHeartbeat = "RaftRPC.Heartbeat" - RPCRaftRequestVote = "RaftRPC.RequestVote" - RPCRaftStatus = "RaftRPC.Status" - RPCRaftEnablePurgeBinlog = "RaftRPC.EnablePurgeBinlog" - RPCRaftDisablePurgeBinlog = "RaftRPC.DisablePurgeBinlog" + RPCRaftPing = "RaftRPC.Ping" + RPCRaftHeartbeat = "RaftRPC.Heartbeat" + RPCRaftRequestVote = "RaftRPC.RequestVote" + RPCRaftStatus = "RaftRPC.Status" + RPCRaftEnablePurgeBinlog = "RaftRPC.EnablePurgeBinlog" + RPCRaftDisablePurgeBinlog = "RaftRPC.DisablePurgeBinlog" + RPCRaftEnableCheckSemiSync = "RaftRPC.EnableCheckSemiSync" + RPCRaftDisableCheckSemiSync = "RaftRPC.DisableCheckSemiSync" ) // raft diff --git a/src/mysql/mysqlbase.go b/src/mysql/mysqlbase.go index ea9a5e4..dc4bf77 100644 --- a/src/mysql/mysqlbase.go +++ b/src/mysql/mysqlbase.go @@ -271,7 +271,7 @@ func (my *MysqlBase) DisableSemiSyncMaster(db *sql.DB) error { return ExecuteWithTimeout(db, reqTimeout, cmds) } -// SetSemiSyncMasterTimeout useed to set semi-sync master timeout +// SetSemiSyncMasterTimeout used to set semi-sync master timeout func (my *MysqlBase) SetSemiSyncMasterTimeout(db *sql.DB, timeout uint64) error { cmds := fmt.Sprintf("SET GLOBAL rpl_semi_sync_master_timeout=%d", timeout) return ExecuteWithTimeout(db, reqTimeout, cmds) diff --git a/src/raft/leader.go b/src/raft/leader.go index 94fe9e6..3b6f240 100644 --- a/src/raft/leader.go +++ b/src/raft/leader.go @@ -490,6 +490,11 @@ func (r *Leader) checkSemiSyncStop() { // Disable the semi-sync if the nodes number less than 3. func (r *Leader) checkSemiSync() { + if r.skipCheckSemiSync { + r.WARNING("check.semi-sync.skipped[skipCheckSemiSync is true]") + return + } + min := 3 cur := r.getMembers() if cur < min { diff --git a/src/raft/raft.go b/src/raft/raft.go index b137dde..1b497a9 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -73,22 +73,24 @@ type Raft struct { idlePeers map[string]*Peer // all SuperIDLE peers stats model.RaftStats skipPurgeBinlog bool // if true, purge binlog will skipped + skipCheckSemiSync bool // if true, check semi-sync will skipped isBrainSplit bool // if true, follower can upgrade to candidate } // NewRaft creates the new raft. func NewRaft(id string, conf *config.RaftConfig, log *xlog.Log, mysql *mysql.Mysql) *Raft { r := &Raft{ - id: id, - conf: conf, - log: log, - cmd: common.NewLinuxCommand(log), - mysql: mysql, - leader: noLeader, - state: FOLLOWER, - meta: &RaftMeta{}, - peers: make(map[string]*Peer), - idlePeers: make(map[string]*Peer), + id: id, + conf: conf, + log: log, + cmd: common.NewLinuxCommand(log), + mysql: mysql, + leader: noLeader, + state: FOLLOWER, + meta: &RaftMeta{}, + peers: make(map[string]*Peer), + idlePeers: make(map[string]*Peer), + skipCheckSemiSync: false, } // state handler @@ -370,3 +372,8 @@ func (r *Raft) resetCheckVotesTimeout() { func (r *Raft) SetSkipPurgeBinlog(v bool) { r.skipPurgeBinlog = v } + +// SetSkipCheckSemiSync used to set check semi-sync or not. +func (r *Raft) SetSkipCheckSemiSync(v bool) { + r.skipCheckSemiSync = v +} diff --git a/src/raft/raft_test.go b/src/raft/raft_test.go index ee6bedb..0857fd0 100644 --- a/src/raft/raft_test.go +++ b/src/raft/raft_test.go @@ -1407,8 +1407,26 @@ func TestRaftLeaderCheckSemiSync(t *testing.T) { assert.Equal(t, want, got) assert.Equal(t, 2, whoisleader) - // wait for check semi-sync to be invoked + // wait for check semi-sync to be invoked and skipCheckSemiSync changed time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*16)) + assert.Equal(t, false, rafts[2].skipCheckSemiSync) + } + + // disable check semi-sync + { + rafts[2].SetSkipCheckSemiSync(true) + time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*16)) + assert.Equal(t, true, rafts[2].skipCheckSemiSync) + } + + // enable check semi-sync + { + rafts[2].SetSkipCheckSemiSync(false) + + MockWaitLeaderEggs(rafts, 0) + MockWaitLeaderEggs(rafts, 0) + + assert.Equal(t, false, rafts[2].skipCheckSemiSync) } } @@ -1918,14 +1936,14 @@ func TestRaftElectionUnderLearnerInMinority(t *testing.T) { // 1.1 rafts[1] with MockGTID_X3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} // 1.2 rafts[2] with MockGTID_X3{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} // 2. start rafts[0] state as CANDIDATE -// 3. wait 20 times the election timeout +// 3. wait 30 times the election timeout // 4. start rafts[1] as FOLLOWER and rafts[2] as IDLE // InvalidGITD // rafts[0]: C --------------> F --------------> C -> ... -> F // InvalidViewID // rafts[1]: F --------------> C --------------> F -> ... -> C -// 5. wait 4 times the election timeout +// 5. wait 8 times the election timeout // 6. check if rafts[1] is the leader func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) { log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) @@ -1946,8 +1964,8 @@ func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) { MockStateTransition(rafts[0], CANDIDATE) } - // 3. wait 20 times the election timeout - time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*20)) + // 3. wait 30 times the election timeout + time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*30)) // 4. start rafts[1] as FOLLOWER and rafts[2] as IDLE { @@ -1958,8 +1976,8 @@ func TestRaftElectionUnderFollowerAndCandidateAlternate(t *testing.T) { MockStateTransition(rafts[2], IDLE) } - // 5. wait 4 times the election timeout - time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*4)) + // 5. wait 8 times the election timeout + time.Sleep(time.Millisecond * time.Duration(rafts[0].getElectionTimeout()*8)) //6. check if rafts[1] is the leader { diff --git a/src/raft/rpc_raft.go b/src/raft/rpc_raft.go index 1586926..a65c6cd 100644 --- a/src/raft/rpc_raft.go +++ b/src/raft/rpc_raft.go @@ -69,3 +69,15 @@ func (r *RaftRPC) DisablePurgeBinlog(req *model.RaftStatusRPCRequest, rsp *model r.raft.SetSkipPurgeBinlog(true) return nil } + +// EnableCheckSemiSync rpc. +func (r *RaftRPC) EnableCheckSemiSync(req *model.RaftStatusRPCRequest, rsp *model.RaftStatusRPCResponse) error { + r.raft.SetSkipCheckSemiSync(false) + return nil +} + +// DisableCheckSemiSync rpc. +func (r *RaftRPC) DisableCheckSemiSync(req *model.RaftStatusRPCRequest, rsp *model.RaftStatusRPCResponse) error { + r.raft.SetSkipCheckSemiSync(true) + return nil +} diff --git a/src/raft/rpc_raft_test.go b/src/raft/rpc_raft_test.go index 15ac743..8be0bf9 100644 --- a/src/raft/rpc_raft_test.go +++ b/src/raft/rpc_raft_test.go @@ -185,3 +185,75 @@ func TestRaftRPCPurgeBinlog(t *testing.T) { assert.NotEqual(t, want, got) } } + +func TestRaftRPCCheckSemiSync(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + port := common.RandomPort(8000, 9000) + names, rafts, scleanup := MockRafts(log, port, 3, -1) + defer scleanup() + whoisleader := 2 + + // 1. set rafts GTID + // 1.0 rafts[0] with MockGTIDB{Master_Log_File = "mysql-bin.000001", Read_Master_Log_Pos = 123} + // 1.1 rafts[1] with MockGTIDB{Master_Log_File = "mysql-bin.000003", Read_Master_Log_Pos = 123} + // 1.2 rafts[2] with MockGTIDC{Master_Log_File = "mysql-bin.000005", Read_Master_Log_Pos = 123} + { + rafts[0].mysql.SetMysqlHandler(mysql.NewMockGTIDX1()) + rafts[1].mysql.SetMysqlHandler(mysql.NewMockGTIDX3()) + rafts[2].mysql.SetMysqlHandler(mysql.NewMockGTIDX5()) + } + + // 2. Start 3 rafts state as FOLLOWER + for _, raft := range rafts { + raft.Start() + } + + // wait leader eggs + { + MockWaitLeaderEggs(rafts, 1) + } + // check(default is enable) + { + MockWaitLeaderEggs(rafts, 0) + MockWaitLeaderEggs(rafts, 0) + check := rafts[whoisleader].skipCheckSemiSync + assert.Equal(t, false, check) + } + + // disable check semi-sync + { + c, cleanup := MockGetClient(t, names[whoisleader]) + defer cleanup() + + method := model.RPCRaftDisableCheckSemiSync + req := model.NewRaftStatusRPCRequest() + rsp := model.NewRaftStatusRPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + // check + MockWaitLeaderEggs(rafts, 0) + MockWaitLeaderEggs(rafts, 0) + got := rafts[whoisleader].skipCheckSemiSync + assert.Equal(t, true, got) + } + + // enable check semi-sync + { + MockWaitLeaderEggs(rafts, 1) + c, cleanup := MockGetClient(t, names[whoisleader]) + defer cleanup() + + method := model.RPCRaftEnableCheckSemiSync + req := model.NewRaftStatusRPCRequest() + rsp := model.NewRaftStatusRPCResponse(model.OK) + err := c.Call(method, req, rsp) + assert.Nil(t, err) + + // check + MockWaitLeaderEggs(rafts, 0) + MockWaitLeaderEggs(rafts, 0) + got := rafts[whoisleader].skipCheckSemiSync + assert.Equal(t, false, got) + } +}