Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute VerifyReplicationTasks as an individual activity #4656

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,10 +1564,13 @@ var (
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Force replication
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
CreateReplicationTasksLatency = NewTimerDef("create_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
149 changes: 62 additions & 87 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,22 @@ type (

// State Diagram
//
// NOT_CREATED
// │
// │
// CREATED_TO_BE_VERIFIED
// NOT_VERIFIED
// │
// ┌────────┴─────────┐
// │ │
// VERIFIED VERIFIED_SKIPPED
const (
NOT_CREATED VerifyStatus = 0
CREATED_TO_BE_VERIFIED VerifyStatus = 1
VERIFIED VerifyStatus = 2
VERIFY_SKIPPED VerifyStatus = 3
NOT_VERIFIED VerifyStatus = 0
VERIFIED VerifyStatus = 1
VERIFY_SKIPPED VerifyStatus = 2

reasonZombieWorkflow = "Zombie workflow"
reasonWorkflowNotFound = "Workflow not found"
)

func (r VerifyResult) isNotCreated() bool {
return r.Status == NOT_CREATED
}

func (r VerifyResult) isCreatedToBeVerified() bool {
return r.Status == CREATED_TO_BE_VERIFIED
func (r VerifyResult) isNotVerified() bool {
return r.Status == NOT_VERIFIED
}

func (r VerifyResult) isVerified() bool {
Expand Down Expand Up @@ -436,6 +428,11 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
ctx = a.setCallerInfoForGenReplicationTask(ctx, namespace.ID(request.NamespaceID))
rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))

start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.GenerateReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

startIndex := 0
if activity.HasHeartbeatDetails(ctx) {
var finishedIndex int
Expand All @@ -447,11 +444,12 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
for i := startIndex; i < len(request.Executions); i++ {
we := request.Executions[i]
if err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)); err != nil {
if _, isNotFound := err.(*serviceerror.NotFound); !isNotFound {
if !isNotFoundServiceError(err) {
a.logger.Error("force-replication failed to generate replication task", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
}
}

activity.RecordHeartbeat(ctx, i)
}

Expand Down Expand Up @@ -550,71 +548,47 @@ func (a *activities) SeedReplicationQueueWithUserDataEntries(ctx context.Context
}
}

func (a *activities) createReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest, detail *replicationTasksHeartbeatDetails) error {
start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.CreateReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()
func isNotFoundServiceError(err error) bool {
_, ok := err.(*serviceerror.NotFound)
return ok
}

rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))
func (a *activities) verifyHandleNotFoundWorkflow(
ctx context.Context,
namespaceID string,
we *commonpb.WorkflowExecution,
result *VerifyResult,
) error {
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(namespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}
resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: namespaceID,
Execution: we,
})

for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
if r.isCompleted() {
continue
if err != nil {
if isNotFoundServiceError(err) {
// Workflow could be deleted due to retention.
result.Status = VERIFY_SKIPPED
result.Reason = reasonWorkflowNotFound
return nil
}

we := request.Executions[i]
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}

resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: request.NamespaceID,
Execution: &we,
})

switch err.(type) {
case nil:
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)

r.Status = VERIFY_SKIPPED
r.Reason = reasonZombieWorkflow
continue
}

// Only create replication task if it hasn't been already created
if r.isNotCreated() {
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))

switch err.(type) {
case nil:
r.Status = CREATED_TO_BE_VERIFIED
case *serviceerror.NotFound:
// rare case but in case if execution was deleted after above DescribeMutableState
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound
default:
a.logger.Error(fmt.Sprintf("createReplicationTasks failed to generate replication task. Error: %v", err), tags...)
return err
}
}

case *serviceerror.NotFound:
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound
return err
}

default:
return err
}
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)
result.Status = VERIFY_SKIPPED
result.Reason = reasonZombieWorkflow
}

return nil
}

func (a *activities) verifyReplicationTasks(
ctx context.Context,
request *genearteAndVerifyReplicationTasksRequest,
request *verifyReplicationTasksRequest,
detail *replicationTasksHeartbeatDetails,
remoteClient adminservice.AdminServiceClient,
) (verified bool, progress bool, err error) {
Expand All @@ -627,32 +601,41 @@ func (a *activities) verifyReplicationTasks(
for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
we := request.Executions[i]
if r.isNotCreated() {
// invalid state
return false, progress, temporal.NewNonRetryableApplicationError(fmt.Sprintf("verifyReplicationTasks: replication task for %v was not created", we), "", nil)
}

if r.isCompleted() {
continue
}

s := time.Now()
// Check if execution exists on remote cluster
_, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: request.Namespace,
Execution: &we,
})
a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s))

switch err.(type) {
case nil:
a.forceReplicationMetricsHandler.Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
r.Status = VERIFIED
progress = true

case *serviceerror.NotFound:
detail.LastNotFoundWorkflowExecution = we
return false, progress, nil
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1)
if err := a.verifyHandleNotFoundWorkflow(ctx, request.NamespaceID, &we, r); err != nil {
return false, progress, err
}

if r.isNotVerified() {
detail.LastNotFoundWorkflowExecution = we
return false, progress, nil
}

progress = true

default:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)).
Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1)

return false, progress, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
}
}
Expand All @@ -665,7 +648,7 @@ const (
defaultNoProgressNotRetryableTimeout = 15 * time.Minute
)

func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest) error {
func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verifyReplicationTasksRequest) error {
ctx = headers.SetCallerInfo(ctx, headers.NewPreemptableCallerInfo(request.Namespace))
remoteClient := a.clientFactory.NewRemoteAdminClientWithTimeout(
request.TargetClusterEndpoint,
Expand All @@ -684,12 +667,6 @@ func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, requ
activity.RecordHeartbeat(ctx, details)
}

if err := a.createReplicationTasks(ctx, request, &details); err != nil {
return err
}

activity.RecordHeartbeat(ctx, details)

// Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster.
// 1. replication lag
// 2. Zombie workflow execution
Expand All @@ -704,10 +681,8 @@ func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, requ
// - more than NonRetryableTimeout, it means potentially we encountered #4. The activity returns
// non-retryable error and force-replication workflow will restarted.
for {
var verified, progress bool
var err error

if verified, progress, err = a.verifyReplicationTasks(ctx, request, &details, remoteClient); err != nil {
verified, progress, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient)
if err != nil {
return err
}

Expand Down
Loading