From b9a9029e761dbd06bcfb7424f7415700187443b0 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 16 May 2023 15:52:51 -0700 Subject: [PATCH] Use the correct event version for task refresh (#4349) * Use the correct event version for task refresh --- service/history/api/refreshworkflow/api.go | 1 - service/history/ndc/history_replicator.go | 2 +- service/history/ndc/state_rebuilder.go | 1 - service/history/workflow/task_refresher.go | 86 ++-------------------- 4 files changed, 6 insertions(+), 84 deletions(-) diff --git a/service/history/api/refreshworkflow/api.go b/service/history/api/refreshworkflow/api.go index f21cb33b2d4..a1440094c70 100644 --- a/service/history/api/refreshworkflow/api.go +++ b/service/history/api/refreshworkflow/api.go @@ -63,7 +63,6 @@ func Invoke( shard, shard.GetConfig(), shard.GetNamespaceRegistry(), - shard.GetEventsCache(), shard.GetLogger(), ) diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 5bbe7dadfe9..e8dc3e6aaa0 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -370,7 +370,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( return err } - taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.shard.GetEventsCache(), r.logger) + taskRefresh := workflow.NewTaskRefresher(r.shard, r.shard.GetConfig(), r.namespaceRegistry, r.logger) err = taskRefresh.RefreshTasks(ctx, mutableState) if err != nil { return err diff --git a/service/history/ndc/state_rebuilder.go b/service/history/ndc/state_rebuilder.go index 8fa5cb1155e..80b5a84176f 100644 --- a/service/history/ndc/state_rebuilder.go +++ b/service/history/ndc/state_rebuilder.go @@ -99,7 +99,6 @@ func NewStateRebuilder( shard, shard.GetConfig(), shard.GetNamespaceRegistry(), - shard.GetEventsCache(), logger, ), rebuiltHistorySize: 0, diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 43740360b04..6d0166502f4 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -38,7 +38,6 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" ) @@ -51,7 +50,6 @@ type ( shard shard.Context config *configs.Config namespaceRegistry namespace.Registry - eventsCache events.Cache logger log.Logger } ) @@ -60,7 +58,6 @@ func NewTaskRefresher( shard shard.Context, config *configs.Config, namespaceRegistry namespace.Registry, - eventsCache events.Cache, logger log.Logger, ) *TaskRefresherImpl { @@ -68,7 +65,6 @@ func NewTaskRefresher( shard: shard, config: config, namespaceRegistry: namespaceRegistry, - eventsCache: eventsCache, logger: logger, } } @@ -269,15 +265,8 @@ func (r *TaskRefresherImpl) refreshTasksForActivity( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingActivityInfos := mutableState.GetPendingActivityInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - Loop: for _, activityInfo := range pendingActivityInfos { // clear all activity timer task mask for later activity timer task re-generation @@ -294,18 +283,7 @@ Loop: continue Loop } - scheduleEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: activityInfo.ScheduledEventId, - Version: activityInfo.Version, - }, - activityInfo.ScheduledEventBatchId, - currentBranchToken, - ) + scheduleEvent, err := mutableState.GetActivityScheduledEvent(ctx, activityInfo.ScheduledEventId) if err != nil { return err } @@ -359,33 +337,14 @@ func (r *TaskRefresherImpl) refreshTasksForChildWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingChildWorkflowInfos := mutableState.GetPendingChildExecutionInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - Loop: for _, childWorkflowInfo := range pendingChildWorkflowInfos { if childWorkflowInfo.StartedEventId != common.EmptyEventID { continue Loop } - - scheduleEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: childWorkflowInfo.InitiatedEventId, - Version: childWorkflowInfo.Version, - }, - childWorkflowInfo.InitiatedEventBatchId, - currentBranchToken, - ) + scheduleEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, childWorkflowInfo.InitiatedEventId) if err != nil { return err } @@ -406,28 +365,10 @@ func (r *TaskRefresherImpl) refreshTasksForRequestCancelExternalWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingRequestCancelInfos := mutableState.GetPendingRequestCancelExternalInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - for _, requestCancelInfo := range pendingRequestCancelInfos { - initiateEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: requestCancelInfo.GetInitiatedEventId(), - Version: requestCancelInfo.GetVersion(), - }, - requestCancelInfo.GetInitiatedEventBatchId(), - currentBranchToken, - ) + initiateEvent, err := mutableState.GetRequesteCancelExternalInitiatedEvent(ctx, requestCancelInfo.GetInitiatedEventId()) if err != nil { return err } @@ -448,28 +389,11 @@ func (r *TaskRefresherImpl) refreshTasksForSignalExternalWorkflow( taskGenerator TaskGenerator, ) error { - executionInfo := mutableState.GetExecutionInfo() - executionState := mutableState.GetExecutionState() pendingSignalInfos := mutableState.GetPendingSignalExternalInfos() - currentBranchToken, err := mutableState.GetCurrentBranchToken() - if err != nil { - return err - } - for _, signalInfo := range pendingSignalInfos { - initiateEvent, err := r.eventsCache.GetEvent( - ctx, - events.EventKey{ - NamespaceID: namespace.ID(executionInfo.NamespaceId), - WorkflowID: executionInfo.WorkflowId, - RunID: executionState.RunId, - EventID: signalInfo.GetInitiatedEventId(), - Version: signalInfo.GetVersion(), - }, - signalInfo.GetInitiatedEventBatchId(), - currentBranchToken, - ) + + initiateEvent, err := mutableState.GetSignalExternalInitiatedEvent(ctx, signalInfo.GetInitiatedEventId()) if err != nil { return err }