Skip to content

Commit

Permalink
Add TTL to task when re-adding it if task has non-zero Expiry value (#…
Browse files Browse the repository at this point in the history
…6631)

* Add ttl to task when readding task if task has Expiry
  • Loading branch information
fimanishi authored Jan 22, 2025
1 parent 3ab9c05 commit 5d1d2e9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 7 deletions.
32 changes: 25 additions & 7 deletions common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -288,19 +289,36 @@ func (t *nosqlTaskStore) CreateTasks(
) (*persistence.CreateTasksResponse, error) {
now := time.Now()
var tasks []*nosqlplugin.TaskRowForInsert
for _, t := range request.Tasks {
for _, taskRequest := range request.Tasks {
task := &nosqlplugin.TaskRow{
DomainID: request.TaskListInfo.DomainID,
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
TaskID: t.TaskID,
WorkflowID: t.Data.WorkflowID,
RunID: t.Data.RunID,
ScheduledID: t.Data.ScheduleID,
TaskID: taskRequest.TaskID,
WorkflowID: taskRequest.Data.WorkflowID,
RunID: taskRequest.Data.RunID,
ScheduledID: taskRequest.Data.ScheduleID,
CreatedTime: now,
PartitionConfig: t.Data.PartitionConfig,
PartitionConfig: taskRequest.Data.PartitionConfig,
}
ttl := int(t.Data.ScheduleToStartTimeoutSeconds)

var ttl int
// If the Data has a non-zero Expiry value, means that the ask is being re-added to the tasks table.
// If that's the case, use the Expiry value to calculate the new TTL value to match history's timeout value.
if !taskRequest.Data.Expiry.IsZero() {
scheduleToStartTimeoutSeconds := int(taskRequest.Data.Expiry.Sub(now).Seconds())

if scheduleToStartTimeoutSeconds > 0 {
ttl = scheduleToStartTimeoutSeconds
} else {
logger := t.GetLogger()
logger.Warn("Async task not created. Task is expired", tag.WorkflowID(taskRequest.Data.WorkflowID), tag.WorkflowRunID(taskRequest.Data.RunID), tag.WorkflowScheduleID(taskRequest.Data.ScheduleID))
continue
}
} else {
ttl = int(taskRequest.Data.ScheduleToStartTimeoutSeconds)
}

tasks = append(tasks, &nosqlplugin.TaskRowForInsert{
TaskRow: *task,
TTLSeconds: ttl,
Expand Down
79 changes: 79 additions & 0 deletions common/persistence/nosql/nosql_task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func setupNoSQLStoreMocks(t *testing.T) (*nosqlTaskStore, *nosqlplugin.MockDB) {
TestTaskType).
Return(&nosqlSt, nil).
AnyTimes()
shardedNosqlStoreMock.EXPECT().GetLogger().Return(log.NewNoop()).AnyTimes()

store := &nosqlTaskStore{
shardedNosqlStore: shardedNosqlStoreMock,
Expand Down Expand Up @@ -446,6 +447,8 @@ func TestCompleteTasksLessThan(t *testing.T) {
}

func TestCreateTasks(t *testing.T) {
now := time.Now()

testCases := []struct {
name string
setupMock func(*nosqlplugin.MockDB)
Expand Down Expand Up @@ -495,6 +498,82 @@ func TestCreateTasks(t *testing.T) {
},
expectError: false,
},
{
name: "success - adding task with Expiry not expired",
setupMock: func(dbMock *nosqlplugin.MockDB) {
dbMock.EXPECT().InsertTasks(gomock.Any(), gomock.Any(), &nosqlplugin.TaskListRow{
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: TestTaskType,
RangeID: 1,
}).Do(func(_ context.Context, tasks []*nosqlplugin.TaskRowForInsert, _ *nosqlplugin.TaskListRow) {
assert.Len(t, tasks, 1)
assert.Equal(t, TestDomainID, tasks[0].DomainID)
assert.Equal(t, "workflow1", tasks[0].WorkflowID)
assert.Equal(t, "run1", tasks[0].RunID)
assert.Equal(t, int64(100), tasks[0].TaskID)
assert.Equal(t, int64(10), tasks[0].ScheduledID)
assert.Equal(t, TestTaskType, tasks[0].TaskListType)
assert.Equal(t, TestTaskListName, tasks[0].TaskListName)
assert.Equal(t, int(now.Add(100*time.Second).Sub(tasks[0].CreatedTime).Seconds()), tasks[0].TTLSeconds)
}).Return(nil).Times(1)
},
request: &persistence.CreateTasksRequest{
TaskListInfo: &persistence.TaskListInfo{
DomainID: TestDomainID,
Name: TestTaskListName,
TaskType: TestTaskType,
RangeID: 1,
},
Tasks: []*persistence.CreateTaskInfo{
{
TaskID: 100,
Data: &persistence.TaskInfo{
WorkflowID: "workflow1",
RunID: "run1",
ScheduleID: 10,
PartitionConfig: nil,
Expiry: now.Add(100 * time.Second),
},
},
},
},
expectError: false,
},
{
name: "success - skipping task with Expiry expired",
setupMock: func(dbMock *nosqlplugin.MockDB) {
dbMock.EXPECT().InsertTasks(gomock.Any(), gomock.Any(), &nosqlplugin.TaskListRow{
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: TestTaskType,
RangeID: 1,
}).Do(func(_ context.Context, tasks []*nosqlplugin.TaskRowForInsert, _ *nosqlplugin.TaskListRow) {
assert.Len(t, tasks, 0)
}).Return(nil).Times(1)
},
request: &persistence.CreateTasksRequest{
TaskListInfo: &persistence.TaskListInfo{
DomainID: TestDomainID,
Name: TestTaskListName,
TaskType: TestTaskType,
RangeID: 1,
},
Tasks: []*persistence.CreateTaskInfo{
{
TaskID: 100,
Data: &persistence.TaskInfo{
WorkflowID: "workflow1",
RunID: "run1",
ScheduleID: 10,
PartitionConfig: nil,
Expiry: now,
},
},
},
},
expectError: false,
},
{
name: "condition failure",
setupMock: func(dbMock *nosqlplugin.MockDB) {
Expand Down

0 comments on commit 5d1d2e9

Please sign in to comment.