From 02ccc815cfe78f95608816946d99ae1819e71115 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Mon, 7 Nov 2022 13:30:59 -0800 Subject: [PATCH] Add random delay to ArchiveExecutionTask --- common/backoff/jitter.go | 3 + common/dynamicconfig/constants.go | 2 + service/history/configs/config.go | 11 +- service/history/tasks/category.go | 2 +- service/history/workflow/task_generator.go | 15 +- .../history/workflow/task_generator_test.go | 250 ++++++++++++------ 6 files changed, 188 insertions(+), 95 deletions(-) diff --git a/common/backoff/jitter.go b/common/backoff/jitter.go index c968aa5f67d0..f4060e433b7d 100644 --- a/common/backoff/jitter.go +++ b/common/backoff/jitter.go @@ -43,6 +43,9 @@ func JitInt64(input int64, coefficient float64) int64 { if input == 0 { return 0 } + if coefficient == 0 { + return input + } base := int64(float64(input) * (1 - coefficient)) addon := rand.Int63n(2 * (input - base)) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 6dc69cfff6cb..db63b86fcce1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -550,6 +550,8 @@ const ( // ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for // archivalQueueProcessor ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval" + // ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks + ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay" // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize" diff --git a/service/history/configs/config.go b/service/history/configs/config.go index fa2687b0840b..1d9710838e74 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -186,10 +186,11 @@ type Config struct { NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn // Archival settings - NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn - ArchiveRequestRPS dynamicconfig.IntPropertyFn - ArchiveSignalTimeout dynamicconfig.DurationPropertyFn - DurableArchivalEnabled dynamicconfig.BoolPropertyFn + NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn + ArchiveRequestRPS dynamicconfig.IntPropertyFn + ArchiveSignalTimeout dynamicconfig.DurationPropertyFn + DurableArchivalEnabled dynamicconfig.BoolPropertyFn + RandomArchiveExecutionDelayUpperBound dynamicconfig.DurationPropertyFn // Size limit related settings BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -303,6 +304,7 @@ type Config struct { ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn } const ( @@ -541,6 +543,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig. ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15), ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second), + ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute), } return cfg diff --git a/service/history/tasks/category.go b/service/history/tasks/category.go index bf788b271b9d..d18216d173a7 100644 --- a/service/history/tasks/category.go +++ b/service/history/tasks/category.go @@ -94,7 +94,7 @@ var ( CategoryArchival = Category{ id: CategoryIDArchival, - cType: CategoryTypeImmediate, + cType: CategoryTypeScheduled, name: CategoryNameArchival, } ) diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 1eb8229dcffe..fe416f05878f 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -142,6 +142,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks( return nil } +var archivalDelayJitterCoefficient = 1.0 + func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( closeEvent *historypb.HistoryEvent, deleteAfterClose bool, @@ -198,10 +200,17 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( }, ) if r.config.DurableArchivalEnabled() { + delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2 + if delay > retention { + delay = retention + } + + archiveTime := closeEvent.GetEventTime().Add(delay) closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{ - // TaskID and VisibilityTimestamp are set by the shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - Version: currentVersion, + // TaskID is set by the shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: archiveTime, + Version: currentVersion, }) } else { closeTime := timestamp.TimeValue(closeEvent.GetEventTime()) diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 7801871d4916..da1936b12e83 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -59,6 +59,7 @@ import ( "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" @@ -66,59 +67,122 @@ import ( "go.temporal.io/server/service/history/tests" ) +type testConfig struct { + Name string + ConfigFn func(config *testParams) +} + +type testParams struct { + DurableArchivalEnabled bool + DeleteAfterClose bool + CloseEventTime time.Time + Retention time.Duration + Logger *log.MockLogger + ArchivalProcessorArchiveDelay time.Duration + + ExpectCloseExecutionVisibilityTask bool + ExpectArchiveExecutionTask bool + ExpectDeleteHistoryEventTask bool + ExpectedArchiveExecutionTaskVisibilityTimestamp time.Time +} + func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { - for _, c := range []struct { - Name string - DurableArchivalEnabled bool - DeleteAfterClose bool - ExpectCloseExecutionVisibilityTask bool - ExpectArchiveExecutionTask bool - ExpectDeleteHistoryEventTask bool - }{ + // we need to set the jitter coefficient to 0 to remove the randomness in the test + archivalDelayJitterCoefficient = 0.0 + for _, c := range []testConfig{ + { + Name: "delete after retention", + ConfigFn: func(p *testParams) { + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectDeleteHistoryEventTask = true + }, + }, { - Name: "Delete after retention", - DurableArchivalEnabled: false, - DeleteAfterClose: false, + Name: "use archival queue", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true - ExpectCloseExecutionVisibilityTask: true, - ExpectDeleteHistoryEventTask: true, - ExpectArchiveExecutionTask: false, + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, }, { - Name: "Use archival queue", - DurableArchivalEnabled: true, - DeleteAfterClose: false, + Name: "delete after close", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true - ExpectCloseExecutionVisibilityTask: true, - ExpectDeleteHistoryEventTask: false, - ExpectArchiveExecutionTask: true, + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, }, { - Name: "DeleteAfterClose", - DurableArchivalEnabled: false, - DeleteAfterClose: true, + Name: "delete after close ignores durable execution flag", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.DeleteAfterClose = true + }, + }, + { + Name: "delay is zero", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.CloseEventTime = time.Unix(0, 0) + p.Retention = 24 * time.Hour + p.ArchivalProcessorArchiveDelay = 0 - ExpectCloseExecutionVisibilityTask: false, - ExpectDeleteHistoryEventTask: false, - ExpectArchiveExecutionTask: false, + p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0) + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, }, { - Name: "DeleteAfterClose ignores durable execution flag", - DurableArchivalEnabled: true, - DeleteAfterClose: true, + Name: "delay exceeds retention", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.CloseEventTime = time.Unix(0, 0) + p.Retention = 24 * time.Hour + p.ArchivalProcessorArchiveDelay = 48*time.Hour + time.Second - ExpectCloseExecutionVisibilityTask: false, - ExpectDeleteHistoryEventTask: false, - ExpectArchiveExecutionTask: false, + p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(24 * time.Hour) + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, + }, + { + Name: "delay is less than retention", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.CloseEventTime = time.Unix(0, 0) + p.Retention = 24 * time.Hour + p.ArchivalProcessorArchiveDelay = 12 * time.Hour + + p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(6 * time.Hour) + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, }, } { c := c t.Run(c.Name, func(t *testing.T) { - t.Parallel() + // t.Parallel() + now := time.Unix(0, 0).UTC() ctrl := gomock.NewController(t) + mockLogger := log.NewMockLogger(ctrl) + p := testParams{ + DurableArchivalEnabled: false, + DeleteAfterClose: false, + CloseEventTime: now, + Retention: time.Hour * 24 * 7, + Logger: mockLogger, + + ExpectCloseExecutionVisibilityTask: false, + ExpectArchiveExecutionTask: false, + ExpectDeleteHistoryEventTask: false, + ExpectedArchiveExecutionTaskVisibilityTimestamp: now, + } + c.ConfigFn(&p) namespaceRegistry := namespace.NewMockRegistry(ctrl) - retention := 24 * time.Hour - namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&retention)) + namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&p.Retention)) namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes() namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes() @@ -131,71 +195,83 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID, )).AnyTimes() mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil) - taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{ + retentionTimerDelay := time.Second + cfg := &configs.Config{ DurableArchivalEnabled: func() bool { - return c.DurableArchivalEnabled + return p.DurableArchivalEnabled }, RetentionTimerJitterDuration: func() time.Duration { - return time.Second + return retentionTimerDelay }, - }) - + ArchivalProcessorArchiveDelay: func() time.Duration { + return p.ArchivalProcessorArchiveDelay + }, + } closeTime := time.Unix(0, 0) - + var allTasks []tasks.Task mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) { - var ( - closeExecutionTask *tasks.CloseExecutionTask - deleteHistoryEventTask *tasks.DeleteHistoryEventTask - closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask - archiveExecutionTask *tasks.ArchiveExecutionTask - ) - for _, task := range ts { - switch t := task.(type) { - case *tasks.CloseExecutionTask: - closeExecutionTask = t - case *tasks.DeleteHistoryEventTask: - deleteHistoryEventTask = t - case *tasks.CloseExecutionVisibilityTask: - closeExecutionVisibilityTask = t - case *tasks.ArchiveExecutionTask: - archiveExecutionTask = t - } - } - require.NotNil(t, closeExecutionTask) - assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose) - - if c.ExpectCloseExecutionVisibilityTask { - assert.NotNil(t, closeExecutionVisibilityTask) - } else { - assert.Nil(t, closeExecutionVisibilityTask) - } - if c.ExpectArchiveExecutionTask { - require.NotNil(t, archiveExecutionTask) - assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String()) - assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID) - assert.Equal(t, archiveExecutionTask.RunID, tests.RunID) - } else { - assert.Nil(t, archiveExecutionTask) - } - if c.ExpectDeleteHistoryEventTask { - require.NotNil(t, deleteHistoryEventTask) - assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String()) - assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID) - assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID) - assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.After(closeTime.Add(retention))) - assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.Before(closeTime.Add(retention).Add(time.Second*2))) - } else { - assert.Nil(t, deleteHistoryEventTask) - } + allTasks = append(allTasks, ts...) }) + taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg) err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{ Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{ WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{}, }, - EventTime: timestamp.TimePtr(closeTime), - }, c.DeleteAfterClose) + EventTime: timestamp.TimePtr(p.CloseEventTime), + }, p.DeleteAfterClose) require.NoError(t, err) + + var ( + closeExecutionTask *tasks.CloseExecutionTask + deleteHistoryEventTask *tasks.DeleteHistoryEventTask + closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask + archiveExecutionTask *tasks.ArchiveExecutionTask + ) + for _, task := range allTasks { + switch t := task.(type) { + case *tasks.CloseExecutionTask: + closeExecutionTask = t + case *tasks.DeleteHistoryEventTask: + deleteHistoryEventTask = t + case *tasks.CloseExecutionVisibilityTask: + closeExecutionVisibilityTask = t + case *tasks.ArchiveExecutionTask: + archiveExecutionTask = t + } + } + require.NotNil(t, closeExecutionTask) + assert.Equal(t, p.DeleteAfterClose, closeExecutionTask.DeleteAfterClose) + + if p.ExpectCloseExecutionVisibilityTask { + assert.NotNil(t, closeExecutionVisibilityTask) + } else { + assert.Nil(t, closeExecutionVisibilityTask) + } + if p.ExpectArchiveExecutionTask { + require.NotNil(t, archiveExecutionTask) + assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String()) + assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID) + assert.Equal(t, archiveExecutionTask.RunID, tests.RunID) + assert.Equal( + t, + p.ExpectedArchiveExecutionTaskVisibilityTimestamp, + archiveExecutionTask.VisibilityTimestamp, + ) + } else { + assert.Nil(t, archiveExecutionTask) + } + if p.ExpectDeleteHistoryEventTask { + require.NotNil(t, deleteHistoryEventTask) + assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String()) + assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID) + assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID) + assert.GreaterOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp, closeTime.Add(p.Retention)) + assert.LessOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp, + closeTime.Add(p.Retention).Add(retentionTimerDelay*2)) + } else { + assert.Nil(t, deleteHistoryEventTask) + } }) } }