Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve VTOrc failure detection to be able to better handle dead primary failures #13190

Merged
merged 8 commits into from
Jun 22, 2023
Merged
6 changes: 3 additions & 3 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestAPIEndpoints(t *testing.T) {

// Before we disable recoveries, let us wait until VTOrc has fixed all the issues (if any).
_, _ = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response != "[]"
return response != "null"
})

t.Run("Disable Recoveries API", func(t *testing.T) {
Expand All @@ -112,7 +112,7 @@ func TestAPIEndpoints(t *testing.T) {
// Wait until VTOrc picks up on this issue and verify
// that we see a not null result on the api/replication-analysis page
status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool {
return response == "[]"
return response == "null"
})
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"AnalyzedInstanceAlias": "%s"`, replica.Alias))
Expand All @@ -134,7 +134,7 @@ func TestAPIEndpoints(t *testing.T) {
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?keyspace=ks&shard=80-")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "[]", resp)
assert.Equal(t, "null", resp)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These [] to null changes are coming because I changed an empty slice to a nil slice.


// Check that filtering using just the shard fails
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/replication-analysis?shard=0")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestDurabilityPolicySetLater(t *testing.T) {
time.Sleep(30 * time.Second)

// Now set the correct durability policy
out, err := newCluster.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync")
out, err := newCluster.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy=semi_sync")
require.NoError(t, err, out)

// VTOrc should promote a new primary after seeing the durability policy change
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/primaryfailure/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMain(m *testing.M) {
var cellInfos []*utils.CellInfo
cellInfos = append(cellInfos, &utils.CellInfo{
CellName: utils.Cell1,
NumReplicas: 12,
NumReplicas: 13,
deepthi marked this conversation as resolved.
Show resolved Hide resolved
NumRdonly: 3,
UIDBase: 100,
})
Expand Down
54 changes: 54 additions & 0 deletions go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,60 @@ func TestDownPrimary(t *testing.T) {
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1)
}

// bring down primary before VTOrc has started, let vtorc repair.
func TestDownPrimaryBeforeVTOrc(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{}, 0, "none")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
curPrimary := shard0.Vttablets[0]

// Promote the first tablet as the primary
err := clusterInfo.ClusterInstance.VtctlclientProcess.InitializeShard(keyspace.Name, shard0.Name, clusterInfo.ClusterInstance.Cell, curPrimary.TabletUID)
require.NoError(t, err)

// find the replica and rdonly tablets
var replica, rdonly *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two replcia tablets, so the one not the primary must be the other replica
if tablet.Alias != curPrimary.Alias && tablet.Type == "replica" {
replica = tablet
}
if tablet.Type == "rdonly" {
rdonly = tablet
}
}
assert.NotNil(t, replica, "could not find replica tablet")
assert.NotNil(t, rdonly, "could not find rdonly tablet")

// check that the replication is setup correctly before we failover
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second)

// Make the current primary vttablet unavailable.
_ = curPrimary.VttabletProcess.TearDown()
err = curPrimary.MysqlctlProcess.Stop()
require.NoError(t, err)

// Start a VTOrc instance
utils.StartVTOrcs(t, clusterInfo, []string{"--remote_operation_timeout=10s"}, cluster.VTOrcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)

vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0]

defer func() {
deepthi marked this conversation as resolved.
Show resolved Hide resolved
// we remove the tablet from our global list
utils.PermanentlyRemoveVttablet(clusterInfo, curPrimary)
}()

// check that the replica gets promoted
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)

// also check that the replication is working correctly after failover
utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.RecoverDeadPrimaryRecoveryName, 1)
}

// TestDeadPrimaryRecoversImmediately test Vtorc ability to recover immediately if primary is dead.
// Reason is, unlike other recoveries, in DeadPrimary we don't call DiscoverInstance since we know
// that primary is unreachable. This help us save few seconds depending on value of `RemoteOperationTimeout` flag.
Expand Down
36 changes: 14 additions & 22 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ type CellInfo struct {

// VTOrcClusterInfo stores the information for a cluster. This is supposed to be used only for VTOrc tests.
type VTOrcClusterInfo struct {
ClusterInstance *cluster.LocalProcessCluster
Ts *topo.Server
CellInfos []*CellInfo
VtctldClientProcess *cluster.VtctldClientProcess
lastUsedValue int
ClusterInstance *cluster.LocalProcessCluster
Ts *topo.Server
CellInfos []*CellInfo
lastUsedValue int
}

// CreateClusterAndStartTopo starts the cluster and topology service
Expand Down Expand Up @@ -100,17 +99,13 @@ func CreateClusterAndStartTopo(cellInfos []*CellInfo) (*VTOrcClusterInfo, error)
return nil, err
}

// store the vtctldclient process
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
return &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: cellInfos,
lastUsedValue: 100,
VtctldClientProcess: vtctldClientProcess,
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: cellInfos,
lastUsedValue: 100,
}, err
}

Expand Down Expand Up @@ -307,7 +302,7 @@ func SetupVttabletsAndVTOrcs(t *testing.T, clusterInfo *VTOrcClusterInfo, numRep
if durability == "" {
durability = "none"
}
out, err := clusterInfo.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)

// start vtorc
Expand Down Expand Up @@ -829,20 +824,17 @@ func SetupNewClusterSemiSync(t *testing.T) *VTOrcClusterInfo {
require.NoError(t, err)
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)

out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err, out)

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
require.NoError(t, err)
clusterInfo := &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: nil,
lastUsedValue: 100,
VtctldClientProcess: vtctldClientProcess,
ClusterInstance: clusterInstance,
Ts: ts,
CellInfos: nil,
lastUsedValue: 100,
}
return clusterInfo
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type StructureAnalysisCode string
const (
NoProblem AnalysisCode = "NoProblem"
ClusterHasNoPrimary AnalysisCode = "ClusterHasNoPrimary"
InvalidPrimary AnalysisCode = "InvalidPrimary"
InvalidReplica AnalysisCode = "InvalidReplica"
DeadPrimaryWithoutReplicas AnalysisCode = "DeadPrimaryWithoutReplicas"
DeadPrimary AnalysisCode = "DeadPrimary"
DeadPrimaryAndReplicas AnalysisCode = "DeadPrimaryAndReplicas"
Expand Down
88 changes: 73 additions & 15 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,20 @@ func initializeAnalysisDaoPostConfiguration() {

type clusterAnalysis struct {
hasClusterwideAction bool
totalTablets int
primaryAlias string
durability reparentutil.Durabler
}

// GetReplicationAnalysis will check for replication problems (dead primary; unreachable primary; etc)
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}
func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAnalysisHints) ([]*ReplicationAnalysis, error) {
var result []*ReplicationAnalysis
appendAnalysis := func(analysis *ReplicationAnalysis) {
if analysis.Analysis == NoProblem && len(analysis.StructureAnalysis) == 0 {
return
}
result = append(result, analysis)
}

// TODO(sougou); deprecate ReduceReplicationAnalysisCount
args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, keyspace, shard)
Expand Down Expand Up @@ -262,6 +269,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
)
LEFT JOIN database_instance primary_instance ON (
vitess_tablet.alias = primary_instance.alias
AND vitess_tablet.hostname = primary_instance.hostname
AND vitess_tablet.port = primary_instance.port
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
)
LEFT JOIN vitess_tablet primary_tablet ON (
primary_tablet.hostname = primary_instance.source_host
Expand All @@ -286,7 +295,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna

clusters := make(map[string]*clusterAnalysis)
err := db.Db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error {
a := ReplicationAnalysis{
a := &ReplicationAnalysis{
Analysis: NoProblem,
ProcessingNodeHostname: process.ThisHostname,
ProcessingNodeToken: util.ProcessToken.Hash,
Expand Down Expand Up @@ -406,6 +415,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
}
// ca has clusterwide info
ca := clusters[keyspaceShard]
// Increment the total amount of tablets.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
ca.totalTablets += 1
if ca.hasClusterwideAction {
// We can only take one cluster level action at a time.
return nil
Expand All @@ -415,10 +426,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return nil
}
isInvalid := m.GetBool("is_invalid")
if isInvalid {
return nil
}
if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
if a.IsClusterPrimary && isInvalid {
a.Analysis = InvalidPrimary
a.Description = "VTOrc hasn't been able to reach the primary even once since restart/shutdown"
} else if isInvalid {
a.Analysis = InvalidReplica
a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown"
} else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
a.Analysis = DeadPrimaryWithoutReplicas
a.Description = "Primary cannot be reached by vtorc and has no replica"
ca.hasClusterwideAction = true
Expand Down Expand Up @@ -532,13 +546,6 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
// a.Description = "Primary has no replicas"
// }

appendAnalysis := func(analysis *ReplicationAnalysis) {
if a.Analysis == NoProblem && len(a.StructureAnalysis) == 0 {
return
}
result = append(result, a)
}

{
// Moving on to structure analysis
// We also do structural checks. See if there's potential danger in promotions
Expand Down Expand Up @@ -579,7 +586,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning)
}
}
appendAnalysis(&a)
appendAnalysis(a)

if a.CountReplicas > 0 && hints.AuditAnalysis {
// Interesting enough for analysis
Expand All @@ -590,13 +597,64 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
return nil
})

for _, analysis := range result {
log.Errorf("Analysis - Instance - %v, Code - %v, LastCheckValid - %v, ReplStopped - %v", analysis.AnalyzedInstanceAlias, analysis.Analysis, analysis.LastCheckValid, analysis.ReplicationStopped)
}

result = postProcessAnalyses(result, clusters)

if err != nil {
log.Error(err)
}
// TODO: result, err = getConcensusReplicationAnalysis(result)
return result, err
}

// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clusterAnalysis) []*ReplicationAnalysis {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
for {
// Store whether we have changed the result of replication analysis or not.
resultChanged := false

// Go over all the analysis.
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
for _, analysis := range result {
// If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are
// unable to replicate or not.
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for idx, replicaAnalysis := range result {
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) {
// If the replica's last check is invalid or its replication is stopped, then we consider as not replicating.
if !replicaAnalysis.LastCheckValid || replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, idx)
}
}
}
// If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead.
// In this case, we update the analysis for the primary tablet and remove all the analyses of the replicas.
if totalReplicas > 0 && len(notReplicatingReplicas) == totalReplicas {
resultChanged = true
analysis.Analysis = DeadPrimary
for i := len(notReplicatingReplicas) - 1; i >= 0; i-- {
idxToRemove := notReplicatingReplicas[i]
result = append(result[0:idxToRemove], result[idxToRemove+1:]...)
}
break
}
}
}
if !resultChanged {
break
}
}
return result
}

// auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table.
// To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to
// analysis codes are written.
Expand Down
Loading