Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add random delay to ArchiveExecutionTask #3565

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/backoff/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func JitInt64(input int64, coefficient float64) int64 {
if input == 0 {
return 0
}
if coefficient == 0 {
return input
}

base := int64(float64(input) * (1 - coefficient))
addon := rand.Int63n(2 * (input - base))
Expand Down
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ const (
// ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for
// archivalQueueProcessor
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
// ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ type Config struct {
ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
}

const (
Expand Down Expand Up @@ -541,6 +542,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion service/history/tasks/category.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var (

CategoryArchival = Category{
id: CategoryIDArchival,
cType: CategoryTypeImmediate,
cType: CategoryTypeScheduled,
name: CategoryNameArchival,
}
)
Expand Down
15 changes: 12 additions & 3 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
return nil
}

var archivalDelayJitterCoefficient = 1.0

func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
closeEvent *historypb.HistoryEvent,
deleteAfterClose bool,
Expand Down Expand Up @@ -198,10 +200,17 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
},
)
if r.config.DurableArchivalEnabled() {
delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2
if delay > retention {
delay = retention
}

archiveTime := closeEvent.GetEventTime().Add(delay)
closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{
// TaskID and VisibilityTimestamp are set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
Version: currentVersion,
// TaskID is set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
VisibilityTimestamp: archiveTime,
Version: currentVersion,
})
} else {
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())
Expand Down
250 changes: 163 additions & 87 deletions service/history/workflow/task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,66 +59,130 @@ import (

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
)

type testConfig struct {
Name string
ConfigFn func(config *testParams)
}

type testParams struct {
DurableArchivalEnabled bool
DeleteAfterClose bool
CloseEventTime time.Time
Retention time.Duration
Logger *log.MockLogger
ArchivalProcessorArchiveDelay time.Duration

ExpectCloseExecutionVisibilityTask bool
ExpectArchiveExecutionTask bool
ExpectDeleteHistoryEventTask bool
ExpectedArchiveExecutionTaskVisibilityTimestamp time.Time
}

func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
for _, c := range []struct {
Name string
DurableArchivalEnabled bool
DeleteAfterClose bool
ExpectCloseExecutionVisibilityTask bool
ExpectArchiveExecutionTask bool
ExpectDeleteHistoryEventTask bool
}{
// we need to set the jitter coefficient to 0 to remove the randomness in the test
archivalDelayJitterCoefficient = 0.0
for _, c := range []testConfig{
{
Name: "delete after retention",
ConfigFn: func(p *testParams) {
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectDeleteHistoryEventTask = true
},
},
{
Name: "Delete after retention",
DurableArchivalEnabled: false,
DeleteAfterClose: false,
Name: "use archival queue",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true

ExpectCloseExecutionVisibilityTask: true,
ExpectDeleteHistoryEventTask: true,
ExpectArchiveExecutionTask: false,
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "Use archival queue",
DurableArchivalEnabled: true,
DeleteAfterClose: false,
Name: "delete after close",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true

ExpectCloseExecutionVisibilityTask: true,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: true,
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "DeleteAfterClose",
DurableArchivalEnabled: false,
DeleteAfterClose: true,
Name: "delete after close ignores durable execution flag",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.DeleteAfterClose = true
},
},
{
Name: "delay is zero",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 0

ExpectCloseExecutionVisibilityTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: false,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "DeleteAfterClose ignores durable execution flag",
DurableArchivalEnabled: true,
DeleteAfterClose: true,
Name: "delay exceeds retention",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 48*time.Hour + time.Second

ExpectCloseExecutionVisibilityTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: false,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(24 * time.Hour)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "delay is less than retention",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 12 * time.Hour

p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(6 * time.Hour)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
} {
c := c
t.Run(c.Name, func(t *testing.T) {
t.Parallel()
// t.Parallel()
now := time.Unix(0, 0).UTC()
ctrl := gomock.NewController(t)
mockLogger := log.NewMockLogger(ctrl)
p := testParams{
DurableArchivalEnabled: false,
DeleteAfterClose: false,
CloseEventTime: now,
Retention: time.Hour * 24 * 7,
Logger: mockLogger,

ExpectCloseExecutionVisibilityTask: false,
ExpectArchiveExecutionTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectedArchiveExecutionTaskVisibilityTimestamp: now,
}
c.ConfigFn(&p)
namespaceRegistry := namespace.NewMockRegistry(ctrl)
retention := 24 * time.Hour
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&retention))
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&p.Retention))
namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes()
namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes()

Expand All @@ -131,71 +195,83 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID,
)).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil)
taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{
retentionTimerDelay := time.Second
cfg := &configs.Config{
DurableArchivalEnabled: func() bool {
return c.DurableArchivalEnabled
return p.DurableArchivalEnabled
},
RetentionTimerJitterDuration: func() time.Duration {
return time.Second
return retentionTimerDelay
},
})

ArchivalProcessorArchiveDelay: func() time.Duration {
return p.ArchivalProcessorArchiveDelay
},
}
closeTime := time.Unix(0, 0)

var allTasks []tasks.Task
mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) {
var (
closeExecutionTask *tasks.CloseExecutionTask
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
archiveExecutionTask *tasks.ArchiveExecutionTask
)
for _, task := range ts {
switch t := task.(type) {
case *tasks.CloseExecutionTask:
closeExecutionTask = t
case *tasks.DeleteHistoryEventTask:
deleteHistoryEventTask = t
case *tasks.CloseExecutionVisibilityTask:
closeExecutionVisibilityTask = t
case *tasks.ArchiveExecutionTask:
archiveExecutionTask = t
}
}
require.NotNil(t, closeExecutionTask)
assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)

if c.ExpectCloseExecutionVisibilityTask {
assert.NotNil(t, closeExecutionVisibilityTask)
} else {
assert.Nil(t, closeExecutionVisibilityTask)
}
if c.ExpectArchiveExecutionTask {
require.NotNil(t, archiveExecutionTask)
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
} else {
assert.Nil(t, archiveExecutionTask)
}
if c.ExpectDeleteHistoryEventTask {
require.NotNil(t, deleteHistoryEventTask)
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.After(closeTime.Add(retention)))
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.Before(closeTime.Add(retention).Add(time.Second*2)))
} else {
assert.Nil(t, deleteHistoryEventTask)
}
allTasks = append(allTasks, ts...)
})

taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg)
err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
},
EventTime: timestamp.TimePtr(closeTime),
}, c.DeleteAfterClose)
EventTime: timestamp.TimePtr(p.CloseEventTime),
}, p.DeleteAfterClose)
require.NoError(t, err)

var (
closeExecutionTask *tasks.CloseExecutionTask
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
archiveExecutionTask *tasks.ArchiveExecutionTask
)
for _, task := range allTasks {
switch t := task.(type) {
case *tasks.CloseExecutionTask:
closeExecutionTask = t
case *tasks.DeleteHistoryEventTask:
deleteHistoryEventTask = t
case *tasks.CloseExecutionVisibilityTask:
closeExecutionVisibilityTask = t
case *tasks.ArchiveExecutionTask:
archiveExecutionTask = t
}
}
require.NotNil(t, closeExecutionTask)
assert.Equal(t, p.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)

if p.ExpectCloseExecutionVisibilityTask {
assert.NotNil(t, closeExecutionVisibilityTask)
} else {
assert.Nil(t, closeExecutionVisibilityTask)
}
if p.ExpectArchiveExecutionTask {
require.NotNil(t, archiveExecutionTask)
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
assert.Equal(
t,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp,
archiveExecutionTask.VisibilityTimestamp,
)
} else {
assert.Nil(t, archiveExecutionTask)
}
if p.ExpectDeleteHistoryEventTask {
require.NotNil(t, deleteHistoryEventTask)
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
assert.GreaterOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp, closeTime.Add(p.Retention))
assert.LessOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp,
closeTime.Add(p.Retention).Add(retentionTimerDelay*2))
} else {
assert.Nil(t, deleteHistoryEventTask)
}
})
}
}