diff --git a/common/persistence/nosql/nosql_task_store.go b/common/persistence/nosql/nosql_task_store.go index 4b713afa04c..b1816ee0a6c 100644 --- a/common/persistence/nosql/nosql_task_store.go +++ b/common/persistence/nosql/nosql_task_store.go @@ -30,6 +30,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" "github.com/uber/cadence/common/types" @@ -288,19 +289,36 @@ func (t *nosqlTaskStore) CreateTasks( ) (*persistence.CreateTasksResponse, error) { now := time.Now() var tasks []*nosqlplugin.TaskRowForInsert - for _, t := range request.Tasks { + for _, taskRequest := range request.Tasks { task := &nosqlplugin.TaskRow{ DomainID: request.TaskListInfo.DomainID, TaskListName: request.TaskListInfo.Name, TaskListType: request.TaskListInfo.TaskType, - TaskID: t.TaskID, - WorkflowID: t.Data.WorkflowID, - RunID: t.Data.RunID, - ScheduledID: t.Data.ScheduleID, + TaskID: taskRequest.TaskID, + WorkflowID: taskRequest.Data.WorkflowID, + RunID: taskRequest.Data.RunID, + ScheduledID: taskRequest.Data.ScheduleID, CreatedTime: now, - PartitionConfig: t.Data.PartitionConfig, + PartitionConfig: taskRequest.Data.PartitionConfig, } - ttl := int(t.Data.ScheduleToStartTimeoutSeconds) + + var ttl int + // If the Data has a non-zero Expiry value, means that the ask is being re-added to the tasks table. + // If that's the case, use the Expiry value to calculate the new TTL value to match history's timeout value. + if !taskRequest.Data.Expiry.IsZero() { + scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(now).Seconds()) + + if scheduleToStartTimeoutSeconds > 0 { + ttl = scheduleToStartTimeoutSeconds + } else { + logger := t.GetLogger() + logger.Warn("Async task not created. Task is expired", tag.WorkflowID(taskRequest.Data.WorkflowID), tag.WorkflowRunID(taskRequest.Data.RunID), tag.WorkflowScheduleID(taskRequest.Data.ScheduleID)) + continue + } + } else { + ttl = int(taskRequest.Data.ScheduleToStartTimeoutSeconds) + } + tasks = append(tasks, &nosqlplugin.TaskRowForInsert{ TaskRow: *task, TTLSeconds: ttl, diff --git a/common/persistence/nosql/nosql_task_store_test.go b/common/persistence/nosql/nosql_task_store_test.go index 9b0eca2202e..51628c30f9e 100644 --- a/common/persistence/nosql/nosql_task_store_test.go +++ b/common/persistence/nosql/nosql_task_store_test.go @@ -73,6 +73,7 @@ func setupNoSQLStoreMocks(t *testing.T) (*nosqlTaskStore, *nosqlplugin.MockDB) { TestTaskType). Return(&nosqlSt, nil). AnyTimes() + shardedNosqlStoreMock.EXPECT().GetLogger().Return(log.NewNoop()).AnyTimes() store := &nosqlTaskStore{ shardedNosqlStore: shardedNosqlStoreMock, @@ -446,6 +447,8 @@ func TestCompleteTasksLessThan(t *testing.T) { } func TestCreateTasks(t *testing.T) { + now := time.Now() + testCases := []struct { name string setupMock func(*nosqlplugin.MockDB) @@ -495,6 +498,82 @@ func TestCreateTasks(t *testing.T) { }, expectError: false, }, + { + name: "success - adding task with Expiry not expired", + setupMock: func(dbMock *nosqlplugin.MockDB) { + dbMock.EXPECT().InsertTasks(gomock.Any(), gomock.Any(), &nosqlplugin.TaskListRow{ + DomainID: TestDomainID, + TaskListName: TestTaskListName, + TaskListType: TestTaskType, + RangeID: 1, + }).Do(func(_ context.Context, tasks []*nosqlplugin.TaskRowForInsert, _ *nosqlplugin.TaskListRow) { + assert.Len(t, tasks, 1) + assert.Equal(t, TestDomainID, tasks[0].DomainID) + assert.Equal(t, "workflow1", tasks[0].WorkflowID) + assert.Equal(t, "run1", tasks[0].RunID) + assert.Equal(t, int64(100), tasks[0].TaskID) + assert.Equal(t, int64(10), tasks[0].ScheduledID) + assert.Equal(t, TestTaskType, tasks[0].TaskListType) + assert.Equal(t, TestTaskListName, tasks[0].TaskListName) + assert.Equal(t, int(now.Add(100*time.Second).Sub(tasks[0].CreatedTime).Seconds()), tasks[0].TTLSeconds) + }).Return(nil).Times(1) + }, + request: &persistence.CreateTasksRequest{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: TestDomainID, + Name: TestTaskListName, + TaskType: TestTaskType, + RangeID: 1, + }, + Tasks: []*persistence.CreateTaskInfo{ + { + TaskID: 100, + Data: &persistence.TaskInfo{ + WorkflowID: "workflow1", + RunID: "run1", + ScheduleID: 10, + PartitionConfig: nil, + Expiry: now.Add(100 * time.Second), + }, + }, + }, + }, + expectError: false, + }, + { + name: "success - skipping task with Expiry expired", + setupMock: func(dbMock *nosqlplugin.MockDB) { + dbMock.EXPECT().InsertTasks(gomock.Any(), gomock.Any(), &nosqlplugin.TaskListRow{ + DomainID: TestDomainID, + TaskListName: TestTaskListName, + TaskListType: TestTaskType, + RangeID: 1, + }).Do(func(_ context.Context, tasks []*nosqlplugin.TaskRowForInsert, _ *nosqlplugin.TaskListRow) { + assert.Len(t, tasks, 0) + }).Return(nil).Times(1) + }, + request: &persistence.CreateTasksRequest{ + TaskListInfo: &persistence.TaskListInfo{ + DomainID: TestDomainID, + Name: TestTaskListName, + TaskType: TestTaskType, + RangeID: 1, + }, + Tasks: []*persistence.CreateTaskInfo{ + { + TaskID: 100, + Data: &persistence.TaskInfo{ + WorkflowID: "workflow1", + RunID: "run1", + ScheduleID: 10, + PartitionConfig: nil, + Expiry: now, + }, + }, + }, + }, + expectError: false, + }, { name: "condition failure", setupMock: func(dbMock *nosqlplugin.MockDB) {