From 86440bbf7fc6884f5e60cbc476fb7b361a0b6b7a Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Thu, 3 Nov 2022 11:39:48 -0700 Subject: [PATCH] Add an archival queue processor --- service/history/archivalQueueFactory.go | 108 +++++++++++ service/history/archivalQueueTaskExecutor.go | 139 ++++++++++++++ .../history/archivalQueueTaskExecutor_test.go | 177 ++++++++++++++++++ service/history/configs/config.go | 3 + service/history/queueFactoryBase.go | 4 +- 5 files changed, 429 insertions(+), 2 deletions(-) create mode 100644 service/history/archivalQueueFactory.go create mode 100644 service/history/archivalQueueTaskExecutor.go create mode 100644 service/history/archivalQueueTaskExecutor_test.go diff --git a/service/history/archivalQueueFactory.go b/service/history/archivalQueueFactory.go new file mode 100644 index 000000000000..39ab1d9f44c8 --- /dev/null +++ b/service/history/archivalQueueFactory.go @@ -0,0 +1,108 @@ +package history + +import ( + "go.uber.org/fx" + + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +const ( + archivalQueuePersistenceMaxRPSRatio = 0.15 +) + +type ( + archivalQueueFactoryParams struct { + fx.In + + QueueFactoryBaseParams + } + + archivalQueueFactory struct { + *archivalQueueFactoryParams + QueueFactoryBase + archiver archival.Archiver + } +) + +func NewArchivalQueueFactory( + params *archivalQueueFactoryParams, +) QueueFactory { + hostScheduler := queues.NewNamespacePriorityScheduler( + params.ClusterMetadata.GetCurrentClusterName(), + queues.NamespacePrioritySchedulerOptions{ + WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount, + // we don't need standby weights because we only run in the active cluster + ActiveNamespaceWeights: params.Config.ArchivalProcessorSchedulerRoundRobinWeights, + }, + params.NamespaceRegistry, + params.TimeSource, + params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)), + params.Logger, + ) + return &archivalQueueFactory{ + archivalQueueFactoryParams: params, + QueueFactoryBase: QueueFactoryBase{ + HostScheduler: hostScheduler, + HostPriorityAssigner: queues.NewPriorityAssigner(), + HostRateLimiter: NewQueueHostRateLimiter( + params.Config.ArchivalProcessorMaxPollHostRPS, + params.Config.PersistenceMaxQPS, + archivalQueuePersistenceMaxRPSRatio, + ), + HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( + NewHostRateLimiterRateFn( + params.Config.ArchivalProcessorMaxPollHostRPS, + params.Config.PersistenceMaxQPS, + archivalQueuePersistenceMaxRPSRatio, + ), + params.Config.QueueMaxReaderCount(), + ), + }, + } +} + +func (f *archivalQueueFactory) CreateQueue( + shard shard.Context, + workflowCache workflow.Cache, +) queues.Queue { + logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue) + + executor := newArchivalQueueTaskExecutor(f.archiver, shard, workflowCache, f.MetricsHandler, f.Logger) + + return queues.NewImmediateQueue( + shard, + tasks.CategoryArchival, + f.HostScheduler, + f.HostPriorityAssigner, + executor, + &queues.Options{ + ReaderOptions: queues.ReaderOptions{ + BatchSize: f.Config.ArchivalTaskBatchSize, + MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount, + PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval, + }, + MonitorOptions: queues.MonitorOptions{ + PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount, + ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts, + SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount, + }, + MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS, + MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient, + CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval, + CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient, + MaxReaderCount: f.Config.QueueMaxReaderCount, + TaskMaxRetryCount: f.Config.ArchivalTaskMaxRetryCount, + }, + f.HostReaderRateLimiter, + logger, + f.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)), + ) +} diff --git a/service/history/archivalQueueTaskExecutor.go b/service/history/archivalQueueTaskExecutor.go new file mode 100644 index 000000000000..106907daf189 --- /dev/null +++ b/service/history/archivalQueueTaskExecutor.go @@ -0,0 +1,139 @@ +package history + +import ( + "context" + "errors" + "fmt" + "time" + + common2 "go.temporal.io/server/common" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +type archivalQueueTaskExecutor struct { + archiver archival.Archiver + shardContext shard.Context + workflowCache workflow.Cache + logger log.Logger + metricsClient metrics.MetricsHandler +} + +func newArchivalQueueTaskExecutor(archiver archival.Archiver, shardContext shard.Context, workflowCache workflow.Cache, + metricsHandler metrics.MetricsHandler, logger log.Logger) *archivalQueueTaskExecutor { + return &archivalQueueTaskExecutor{ + archiver: archiver, + shardContext: shardContext, + workflowCache: workflowCache, + logger: logger, + metricsClient: metricsHandler, + } +} + +func (e *archivalQueueTaskExecutor) Execute(ctx context.Context, executable queues.Executable) (tags []metrics.Tag, + isActive bool, err error) { + task := executable.GetTask() + taskType := queues.GetArchivalTaskTypeTagValue(task) + tags = []metrics.Tag{ + getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()), + metrics.TaskTypeTag(taskType), + metrics.OperationTag(taskType), // for backward compatibility + } + switch task := task.(type) { + case *tasks.ArchiveExecutionTask: + err = e.processArchiveExecutionTask(ctx, task) + default: + err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task) + } + return tags, true, err +} + +func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, + task *tasks.ArchiveExecutionTask) (err error) { + weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task) + if err != nil { + return err + } + defer func() { release(err) }() + // TODO: verify task version? + + mutableState, err := weContext.LoadMutableState(ctx) + if err != nil { + return err + } + if mutableState == nil || mutableState.IsWorkflowExecutionRunning() { + return errors.New("cannot archive workflow which is running") + } + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return err + } + + namespaceEntry := mutableState.GetNamespaceEntry() + namespaceName := namespaceEntry.Name() + nextEventID := mutableState.GetNextEventID() + closeFailoverVersion, err := mutableState.GetLastWriteVersion() + if err != nil { + return err + } + + executionInfo := mutableState.GetExecutionInfo() + workflowTypeName := executionInfo.GetWorkflowTypeName() + startTime := executionInfo.GetStartTime() + if startTime == nil { + return errors.New("can't archive workflow with nil start time") + } + executionTime := executionInfo.GetExecutionTime() + if executionTime == nil { + return errors.New("can't archive workflow with nil execution time") + } + closeTime := executionInfo.GetCloseTime() + if closeTime == nil { + return errors.New("can't archive workflow with nil close time") + } + executionState := mutableState.GetExecutionState() + memo := getWorkflowMemo(copyMemo(executionInfo.Memo)) + _, err = e.archiver.Archive(ctx, &archival.Request{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: task.NamespaceID, + Namespace: namespaceName.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + BranchToken: branchToken, + NextEventID: nextEventID, + CloseFailoverVersion: closeFailoverVersion, + HistoryURI: namespaceEntry.HistoryArchivalState().URI, + WorkflowTypeName: workflowTypeName, + StartTime: *startTime, + ExecutionTime: *executionTime, + CloseTime: *closeTime, + Status: executionState.Status, + HistoryLength: nextEventID - 1, + Memo: memo, + SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)), + VisibilityURI: namespaceEntry.VisibilityArchivalState().URI, + Targets: []archival.Target{archival.TargetHistory, archival.TargetVisibility}, + CallerService: common2.HistoryServiceName, + }) + if err != nil { + return err + } + retention := namespaceEntry.Retention() + if retention == 0 { + retention = 7 * 24 * time.Hour + } + deleteTime := closeTime.Add(retention) + mutableState.AddTasks(&tasks.DeleteHistoryEventTask{ + WorkflowKey: task.WorkflowKey, + VisibilityTimestamp: deleteTime, + Version: task.Version, + BranchToken: branchToken, + WorkflowDataAlreadyArchived: true, + }) + return err +} diff --git a/service/history/archivalQueueTaskExecutor_test.go b/service/history/archivalQueueTaskExecutor_test.go new file mode 100644 index 000000000000..17659aa21f0f --- /dev/null +++ b/service/history/archivalQueueTaskExecutor_test.go @@ -0,0 +1,177 @@ +package history + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + shard "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/tests" + "go.temporal.io/server/service/history/workflow" +) + +func TestArchivalQueueTaskExecutor(t *testing.T) { + startTime := time.Unix(1, 0) + executionTime := startTime.Add(time.Second) + closeTime := executionTime.Add(time.Minute) + hourRetention := time.Hour + workflowKey := definition.NewWorkflowKey(tests.NamespaceID.String(), tests.WorkflowID, tests.RunID) + version := 52 + for _, c := range []struct { + Name string + IsWorkflowExecutionRunning bool + Retention *time.Duration + Task tasks.Task + ExpectedDeleteTime time.Time + ExpectedErrorSubstrings []string + ExpectArchive bool + ExpectAddTask bool + }{ + { + Name: "Success", + IsWorkflowExecutionRunning: false, + Retention: &hourRetention, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(hourRetention), + ExpectArchive: true, + ExpectAddTask: true, + }, + { + Name: "Running execution", + IsWorkflowExecutionRunning: true, + Retention: &hourRetention, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(hourRetention), + ExpectedErrorSubstrings: []string{"cannot archive workflow which is running"}, + ExpectArchive: false, + ExpectAddTask: false, + }, + { + Name: "Default retention", + IsWorkflowExecutionRunning: false, + Retention: nil, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(24 * time.Hour * 7), + ExpectArchive: true, + ExpectAddTask: true, + }, + { + Name: "Wrong task type", + Task: &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + }, + ExpectedErrorSubstrings: []string{"task with invalid type"}, + }, + } { + c := c // store c in closure to prevent loop from changing it when a parallel task is accessing it + t.Run(c.Name, func(t *testing.T) { + t.Parallel() + controller := gomock.NewController(t) + namespaceRegistry := namespace.NewMockRegistry(controller) + task := c.Task + shardContext := shard.NewMockContext(controller) + workflowCache := workflow.NewMockCache(controller) + workflowContext := workflow.NewMockContext(controller) + mutableState := workflow.NewMockMutableState(controller) + branchToken := []byte{42} + metricsHandler := metrics.NoopMetricsHandler + logger := log.NewNoopLogger() + timeSource := clock.NewRealTimeSource() + archiver := archival.NewMockArchiver(controller) + + namespaceRegistry.EXPECT().GetNamespaceName(tests.NamespaceID).Return(tests.Namespace, nil).AnyTimes() + mutableState.EXPECT().IsWorkflowExecutionRunning().Return(c.IsWorkflowExecutionRunning).AnyTimes() + shardContext.EXPECT().GetNamespaceRegistry().Return(namespaceRegistry).AnyTimes() + workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes() + workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(workflowContext, workflow.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes() + + if c.ExpectArchive { + namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(c.Retention)) + mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil) + mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry) + mutableState.EXPECT().GetNextEventID().Return(int64(100)) + mutableState.EXPECT().GetLastWriteVersion().Return(int64(200), nil) + executionInfo := &persistence.WorkflowExecutionInfo{ + StartTime: &startTime, + ExecutionTime: &executionTime, + CloseTime: &closeTime, + } + mutableState.EXPECT().GetExecutionInfo().Return(executionInfo) + executionState := &persistence.WorkflowExecutionState{ + State: 0, + Status: 0, + } + mutableState.EXPECT().GetExecutionState().Return(executionState) + shardContext.EXPECT().GetShardID().Return(int32(1)) + archiver.EXPECT().Archive(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, + request *archival.Request) (*archival.Response, error) { + assert.Equal(t, startTime, request.StartTime) + assert.Equal(t, executionTime, request.ExecutionTime) + assert.Equal(t, closeTime, request.CloseTime) + + return &archival.Response{}, nil + }) + } + + if c.ExpectAddTask { + mutableState.EXPECT().AddTasks(&tasks.DeleteHistoryEventTask{ + WorkflowKey: workflowKey, + VisibilityTimestamp: c.ExpectedDeleteTime, + TaskID: 0, + Version: int64(version), + BranchToken: branchToken, + WorkflowDataAlreadyArchived: true, + }) + } + + executor := newArchivalQueueTaskExecutor(archiver, shardContext, workflowCache, metricsHandler, logger) + executable := queues.NewExecutable( + queues.DefaultReaderId, + task, + nil, + executor, + nil, + nil, + queues.NewNoopPriorityAssigner(), + timeSource, + namespaceRegistry, + nil, + metrics.NoopMetricsHandler, + nil, + nil, + ) + err := executable.Execute() + if len(c.ExpectedErrorSubstrings) > 0 { + require.Error(t, err) + for _, s := range c.ExpectedErrorSubstrings { + assert.ErrorContains(t, err, s) + } + } else { + assert.Nil(t, err) + } + }) + } +} diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 3376753a4364..3bd3bf1ad59d 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -25,6 +25,7 @@ package configs import ( + "math" "time" enumspb "go.temporal.io/api/enums/v1" @@ -284,6 +285,7 @@ type Config struct { ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn ArchivalTaskBatchSize dynamicconfig.IntPropertyFn + ArchivalTaskMaxRetryCount dynamicconfig.IntPropertyFn ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn ArchivalProcessorMaxPollRPS dynamicconfig.IntPropertyFn ArchivalProcessorMaxPollInterval dynamicconfig.DurationPropertyFn @@ -506,6 +508,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis // Archival related ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100), + ArchivalTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ArchivalTaskMaxRetryCount, math.MaxInt32), ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20), ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0), ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512), diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 20a0aecd2438..2daf284d33f5 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -52,9 +52,9 @@ type ( common.Daemon // TODO: - // 1. Remove the cache parameter after workflow cache become a host level component + // 1. Remove the workflowCache parameter after workflow workflowCache become a host level component // and it can be provided as a parameter when creating a QueueFactory instance. - // Currently, workflow cache is shard level, but we can't get it from shard or engine interface, + // Currently, workflow workflowCache is shard level, but we can't get it from shard or engine interface, // as that will lead to a cycle dependency issue between shard and workflow package. // 2. Move this interface to queues package after 1 is done so that there's no cycle dependency // between workflow and queues package.