Skip to content

Commit

Permalink
Improve VTOrc failure detection to be able to better handle dead prim…
Browse files Browse the repository at this point in the history
…ary failures (vitessio#13190)

* test: add a failing test

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix the problem

Signed-off-by: Manan Gupta <[email protected]>

* feat: read vttablet records for instances that have no mysql port too

Signed-off-by: Manan Gupta <[email protected]>

* feat: refactor the code

Signed-off-by: Manan Gupta <[email protected]>

* feat: add tests for the newly introduced function

Signed-off-by: Manan Gupta <[email protected]>

* test: fix test expectations

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix flakiness in tests

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix comments

Signed-off-by: Manan Gupta <[email protected]>

---------

Co-authored-by: Manan Gupta <[email protected]>
Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
2 people authored and maxenglander committed Jul 6, 2023
1 parent 6512ed1 commit 21cbfdb
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 54 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vtorc/primaryfailure/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMain(m *testing.M) {
var cellInfos []*utils.CellInfo
cellInfos = append(cellInfos, &utils.CellInfo{
CellName: utils.Cell1,
NumReplicas: 12,
NumReplicas: 13,
NumRdonly: 2,
UIDBase: 100,
})
Expand Down
51 changes: 51 additions & 0 deletions go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,57 @@ func TestDownPrimary(t *testing.T) {
utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second)
}

// bring down primary before VTOrc has started, let vtorc repair.
func TestDownPrimaryBeforeVTOrc(t *testing.T) {
defer cluster.PanicHandler(t)
utils.SetupVttabletsAndVtorc(t, clusterInfo, 2, 1, nil, cluster.VtorcConfiguration{}, 0, "none")
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
curPrimary := shard0.Vttablets[0]

// Promote the first tablet as the primary
err := clusterInfo.ClusterInstance.VtctlclientProcess.InitializeShard(keyspace.Name, shard0.Name, clusterInfo.ClusterInstance.Cell, curPrimary.TabletUID)
require.NoError(t, err)

// find the replica and rdonly tablets
var replica, rdonly *cluster.Vttablet
for _, tablet := range shard0.Vttablets {
// we know we have only two replcia tablets, so the one not the primary must be the other replica
if tablet.Alias != curPrimary.Alias && tablet.Type == "replica" {
replica = tablet
}
if tablet.Type == "rdonly" {
rdonly = tablet
}
}
assert.NotNil(t, replica, "could not find replica tablet")
assert.NotNil(t, rdonly, "could not find rdonly tablet")

// check that the replication is setup correctly before we failover
utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, replica}, 10*time.Second)

// Make the current primary vttablet unavailable.
_ = curPrimary.VttabletProcess.TearDown()
err = curPrimary.MysqlctlProcess.Stop()
require.NoError(t, err)

// Start a VTOrc instance
utils.StartVtorcs(t, clusterInfo, []string{"--remote_operation_timeout=10s"}, cluster.VtorcConfiguration{
PreventCrossDataCenterPrimaryFailover: true,
}, 1)

defer func() {
// we remove the tablet from our global list
utils.PermanentlyRemoveVttablet(clusterInfo, curPrimary)
}()

// check that the replica gets promoted
utils.CheckPrimaryTablet(t, clusterInfo, replica, true)

// also check that the replication is working correctly after failover
utils.VerifyWritesSucceed(t, clusterInfo, replica, []*cluster.Vttablet{rdonly}, 10*time.Second)
}

// Failover should not be cross data centers, according to the configuration file
// covers part of the test case master-failover-lost-replicas from orchestrator
func TestCrossDataCenterFailure(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/orchestrator/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type StructureAnalysisCode string
const (
NoProblem AnalysisCode = "NoProblem"
ClusterHasNoPrimary AnalysisCode = "ClusterHasNoPrimary"
InvalidPrimary AnalysisCode = "InvalidPrimary"
InvalidReplica AnalysisCode = "InvalidReplica"
DeadPrimaryWithoutReplicas AnalysisCode = "DeadPrimaryWithoutReplicas"
DeadPrimary AnalysisCode = "DeadPrimary"
DeadPrimaryAndReplicas AnalysisCode = "DeadPrimaryAndReplicas"
Expand Down Expand Up @@ -122,6 +124,7 @@ type ReplicationAnalysis struct {
AnalyzedInstancePrimaryAlias string
TabletType topodatapb.TabletType
PrimaryTimeStamp time.Time
AnalyzedShard string
SuggestedClusterAlias string
ClusterDetails ClusterInfo
AnalyzedInstanceDataCenter string
Expand Down
90 changes: 77 additions & 13 deletions go/vt/orchestrator/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ func initializeAnalysisDaoPostConfiguration() {

type clusterAnalysis struct {
hasClusterwideAction bool
totalTablets int
primaryAlias string
durability reparentutil.Durabler
}

// GetReplicationAnalysis will check for replication problems (dead primary; unreachable primary; etc)
func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]ReplicationAnalysis, error) {
result := []ReplicationAnalysis{}
func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints) ([]*ReplicationAnalysis, error) {
result := []*ReplicationAnalysis{}

// TODO(sougou); deprecate ReduceReplicationAnalysisCount
args := sqlutils.Args(config.Config.ReasonableReplicationLagSeconds, ValidSecondsFromSeenToLastAttemptedCheck(), config.Config.ReasonableReplicationLagSeconds, clusterName)
Expand All @@ -76,6 +77,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
vitess_tablet.port,
vitess_tablet.tablet_type,
vitess_tablet.primary_timestamp,
vitess_tablet.shard,
vitess_keyspace.keyspace AS keyspace,
vitess_keyspace.keyspace_type AS keyspace_type,
vitess_keyspace.durability_policy AS durability_policy,
Expand Down Expand Up @@ -309,6 +311,8 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
)
LEFT JOIN database_instance primary_instance ON (
vitess_tablet.alias = primary_instance.alias
AND vitess_tablet.hostname = primary_instance.hostname
AND vitess_tablet.port = primary_instance.port
)
LEFT JOIN vitess_tablet primary_tablet ON (
primary_tablet.hostname = primary_instance.source_host
Expand Down Expand Up @@ -354,7 +358,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)

clusters := make(map[string]*clusterAnalysis)
err := db.Db.QueryOrchestrator(query, args, func(m sqlutils.RowMap) error {
a := ReplicationAnalysis{
a := &ReplicationAnalysis{
Analysis: NoProblem,
ProcessingNodeHostname: process.ThisHostname,
ProcessingNodeToken: util.ProcessToken.Hash,
Expand All @@ -376,6 +380,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)

a.TabletType = tablet.Type
a.AnalyzedKeyspace = m.GetString("keyspace")
a.AnalyzedShard = m.GetString("shard")
a.PrimaryTimeStamp = m.GetTime("primary_timestamp")

if keyspaceType := topodatapb.KeyspaceType(m.GetInt("keyspace_type")); keyspaceType == topodatapb.KeyspaceType_SNAPSHOT {
Expand All @@ -401,6 +406,8 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
Type: BinaryLog,
}
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")
a.ClusterDetails.Keyspace = m.GetString("keyspace")
a.ClusterDetails.Shard = m.GetString("shard")
a.SuggestedClusterAlias = m.GetString("suggested_cluster_alias")
a.ClusterDetails.ClusterName = m.GetString("cluster_name")
a.ClusterDetails.ClusterAlias = m.GetString("cluster_alias")
Expand Down Expand Up @@ -458,11 +465,12 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
log.Debugf(analysisMessage)
}
}
if clusters[a.SuggestedClusterAlias] == nil {
clusters[a.SuggestedClusterAlias] = &clusterAnalysis{}
keyspaceShard := getKeyspaceShardName(a.ClusterDetails.Keyspace, a.ClusterDetails.Shard)
if clusters[keyspaceShard] == nil {
clusters[keyspaceShard] = &clusterAnalysis{}
if a.TabletType == topodatapb.TabletType_PRIMARY {
a.IsClusterPrimary = true
clusters[a.SuggestedClusterAlias].primaryAlias = a.AnalyzedInstanceAlias
clusters[keyspaceShard].primaryAlias = a.AnalyzedInstanceAlias
}
durabilityPolicy := m.GetString("durability_policy")
if durabilityPolicy == "" {
Expand All @@ -474,10 +482,11 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
log.Errorf("can't get the durability policy %v - %v. Skipping keyspace - %v.", durabilityPolicy, err, a.AnalyzedKeyspace)
return nil
}
clusters[a.SuggestedClusterAlias].durability = durability
clusters[keyspaceShard].durability = durability
}
// ca has clusterwide info
ca := clusters[a.SuggestedClusterAlias]
ca := clusters[keyspaceShard]
ca.totalTablets++
if ca.hasClusterwideAction {
// We can only take one cluster level action at a time.
return nil
Expand All @@ -487,10 +496,13 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
return nil
}
isInvalid := m.GetBool("is_invalid")
if isInvalid {
return nil
}
if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
if a.IsClusterPrimary && isInvalid {
a.Analysis = InvalidPrimary
a.Description = "VTOrc hasn't been able to reach the primary even once since restart/shutdown"
} else if isInvalid {
a.Analysis = InvalidReplica
a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown"
} else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 {
a.Analysis = DeadPrimaryWithoutReplicas
a.Description = "Primary cannot be reached by orchestrator and has no replica"
ca.hasClusterwideAction = true
Expand Down Expand Up @@ -671,7 +683,7 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
a.StructureAnalysis = append(a.StructureAnalysis, NotEnoughValidSemiSyncReplicasStructureWarning)
}
}
appendAnalysis(&a)
appendAnalysis(a)

if a.CountReplicas > 0 && hints.AuditAnalysis {
// Interesting enough for analysis
Expand All @@ -680,13 +692,65 @@ func GetReplicationAnalysis(clusterName string, hints *ReplicationAnalysisHints)
return nil
})

for _, analysis := range result {
log.Errorf("Analysis - Instance - %v, Code - %v, LastCheckValid - %v, ReplStopped - %v", analysis.AnalyzedInstanceAlias, analysis.Analysis, analysis.LastCheckValid, analysis.ReplicationStopped)
}

result = postProcessAnalyses(result, clusters)

if err != nil {
return result, log.Errore(err)
}
// TODO: result, err = getConcensusReplicationAnalysis(result)
return result, log.Errore(err)
}

// postProcessAnalyses is used to update different analyses based on the information gleaned from looking at all the analyses together instead of individual data.
func postProcessAnalyses(result []*ReplicationAnalysis, clusters map[string]*clusterAnalysis) []*ReplicationAnalysis {
for {
// Store whether we have changed the result of replication analysis or not.
resultChanged := false

// Go over all the analyses.
for _, analysis := range result {
// If one of them is an InvalidPrimary, then we see if all the other tablets in this keyspace shard are
// unable to replicate or not.
if analysis.Analysis == InvalidPrimary {
keyspaceName := analysis.ClusterDetails.Keyspace
shardName := analysis.ClusterDetails.Shard
keyspaceShard := getKeyspaceShardName(keyspaceName, shardName)
totalReplicas := clusters[keyspaceShard].totalTablets - 1
var notReplicatingReplicas []int
for idx, replicaAnalysis := range result {
if replicaAnalysis.ClusterDetails.Keyspace == keyspaceName &&
replicaAnalysis.ClusterDetails.Shard == shardName && topo.IsReplicaType(replicaAnalysis.TabletType) {

// If the replica's last check is invalid or its replication is stopped, then we consider as not replicating.
if !replicaAnalysis.LastCheckValid || replicaAnalysis.ReplicationStopped {
notReplicatingReplicas = append(notReplicatingReplicas, idx)
}
}
}
// If none of the other tablets are able to replicate, then we conclude that this primary is not just Invalid, but also Dead.
// In this case, we update the analysis for the primary tablet and remove all the analyses of the replicas.
if totalReplicas > 0 && len(notReplicatingReplicas) == totalReplicas {
resultChanged = true
analysis.Analysis = DeadPrimary
for i := len(notReplicatingReplicas) - 1; i >= 0; i-- {
idxToRemove := notReplicatingReplicas[i]
result = append(result[0:idxToRemove], result[idxToRemove+1:]...)
}
break
}
}
}
if !resultChanged {
break
}
}
return result
}

// auditInstanceAnalysisInChangelog will write down an instance's analysis in the database_instance_analysis_changelog table.
// To not repeat recurring analysis code, the database_instance_last_analysis table is used, so that only changes to
// analysis codes are written.
Expand Down
Loading

0 comments on commit 21cbfdb

Please sign in to comment.