-
Notifications
You must be signed in to change notification settings - Fork 912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add verification of replication tasks in force replication #4630
Conversation
case *serviceerror.NotFound: | ||
return nil | ||
default: | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surface NotFound
so verification can skipped such WF.
we := request.Executions[i] | ||
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)} | ||
|
||
resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a.historyClient
-> a.localHistoryClient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or source history client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think historyClient
implicitly mean local
as other where in the codebase. I can change it if you feel strong about it.
|
||
switch err.(type) { | ||
case nil: | ||
if resp.GetCacheMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if resp.GetCacheMutableState()
this may return nil, use GetDatabaseMutableState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied. what are the difference between 2? GetDatabaseMutableState
also just read from the object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetCacheMutableState contains the cached version of mutable state, which can be nil (not cached)
GetDatabaseMutableState directly load mutable state from DB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall idea LGTM
implementation can be improved, by e.g. break giant for loop into for loop and function invocations
27300fd
to
f138cb9
Compare
replicationTasksHeartbeatDetails struct { | ||
Results []VerifyResult | ||
CheckPoint time.Time | ||
LastNotFoundWorkflowExecution commonpb.WorkflowExecution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this? Should we use a fixed length slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VerifyResult keeps the status of replication tasks for each execution task from input, which is a variable-length array:
@@ -1563,6 +1563,12 @@ var ( | |||
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors") | |||
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors") | |||
|
|||
// Force replication |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we care about verification failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can use activity failure metrics. we can always add later if needed.
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1) | ||
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...) | ||
|
||
r.Status = VERIFY_SKIPPED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem like we skip generate replication task, not just skip verify. And we should we filter this in the force replication API?
ae0f352
to
bf5dfa0
Compare
<!-- Describe what has changed in this PR --> **What changed?** Add a verification step to check if generated workflow executions exist on target cluster. <!-- Tell your future self why have you made these changes --> **Why?** To ensure all generated replication tasks have been successfully applied on target cluster. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests + cluster tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** No
<!-- Describe what has changed in this PR --> **What changed?** Add a verification step to check if generated workflow executions exist on target cluster. <!-- Tell your future self why have you made these changes --> **Why?** To ensure all generated replication tasks have been successfully applied on target cluster. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests + cluster tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** No
<!-- Describe what has changed in this PR --> **What changed?** Add a verification step to check if generated workflow executions exist on target cluster. <!-- Tell your future self why have you made these changes --> **Why?** To ensure all generated replication tasks have been successfully applied on target cluster. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests + cluster tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** No
<!-- Describe what has changed in this PR --> **What changed?** Add a verification step to check if generated workflow executions exist on target cluster. <!-- Tell your future self why have you made these changes --> **Why?** To ensure all generated replication tasks have been successfully applied on target cluster. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests + cluster tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** No
What changed?
Add a verification step to check if generated workflow executions exist on target cluster.
Why?
To ensure all generated replication tasks have been successfully applied on target cluster.
How did you test it?
Unit tests + cluster tests
Potential risks
Is hotfix candidate?
No