From 63d25fe4d9ab77f73b50352aa7bf98c87daac384 Mon Sep 17 00:00:00 2001 From: Aditi Gautam Date: Wed, 3 Apr 2024 15:13:24 -0700 Subject: [PATCH 1/2] Added some validations in the nosql store and new tests --- .../nosql/nosql_execution_store.go | 7 +- .../nosql/nosql_execution_store_test.go | 223 ++++++++++++++++++ 2 files changed, 229 insertions(+), 1 deletion(-) diff --git a/common/persistence/nosql/nosql_execution_store.go b/common/persistence/nosql/nosql_execution_store.go index 4083418102b..aecb4d020f5 100644 --- a/common/persistence/nosql/nosql_execution_store.go +++ b/common/persistence/nosql/nosql_execution_store.go @@ -718,7 +718,9 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.InternalPutReplicationTaskToDLQRequest, ) error { - + if request.TaskInfo == nil || request.TaskInfo.TaskID == 0 { + return &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"} + } err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo) if err != nil { return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err) @@ -731,6 +733,9 @@ func (d *nosqlExecutionStore) GetReplicationTasksFromDLQ( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (*persistence.InternalGetReplicationTasksFromDLQResponse, error) { + if request.ReadLevel > request.MaxReadLevel { + return nil, &types.BadRequestError{Message: "ReadLevel cannot be higher than MaxReadLevel"} + } tasks, nextPageToken, err := d.db.SelectReplicationDLQTasksOrderByTaskID(ctx, d.shardID, request.SourceClusterName, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel) if err != nil { return nil, convertCommonErrors(d.db, "GetReplicationTasksFromDLQ", err) diff --git a/common/persistence/nosql/nosql_execution_store_test.go b/common/persistence/nosql/nosql_execution_store_test.go index 9759f9210b7..0e0a3ffa750 100644 --- a/common/persistence/nosql/nosql_execution_store_test.go +++ b/common/persistence/nosql/nosql_execution_store_test.go @@ -860,6 +860,211 @@ func TestNosqlExecutionStore(t *testing.T) { }, expectedError: nil, }, + { + name: "GetTimerIndexTasks success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectTimerTasksOrderByVisibilityTime( + ctx, + shardID, + 10, + gomock.Nil(), + gomock.Any(), + gomock.Any(), + ).Return([]*persistence.TimerTaskInfo{}, nil, nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{ + BatchSize: 10, + MinTimestamp: time.Now().Add(-time.Hour), + MaxTimestamp: time.Now(), + }) + return err + }, + expectedError: nil, + }, + { + name: "GetTimerIndexTasks success - empty result", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectTimerTasksOrderByVisibilityTime(ctx, shardID, 10, gomock.Nil(), gomock.Any(), gomock.Any()). + Return([]*persistence.TimerTaskInfo{}, []byte{}, nil) // Return an empty list + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + resp, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{ + BatchSize: 10, + MinTimestamp: time.Now().Add(-time.Hour), + MaxTimestamp: time.Now(), + }) + if err != nil { + return err + } + if len(resp.Timers) != 0 { + return errors.New("expected empty result set for timers") + } + return nil + }, + expectedError: nil, + }, + { + name: "PutReplicationTaskToDLQ success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + replicationTaskInfo := newInternalReplicationTaskInfo() + + mockDB.EXPECT(). + InsertReplicationDLQTask(ctx, shardID, "sourceCluster", gomock.Any()). + DoAndReturn(func(_ context.Context, _ int, _ string, taskInfo persistence.InternalReplicationTaskInfo) error { + require.Equal(t, replicationTaskInfo, taskInfo) + return nil + }) + + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + taskInfo := newInternalReplicationTaskInfo() + return store.PutReplicationTaskToDLQ(ctx, &persistence.InternalPutReplicationTaskToDLQRequest{ + SourceClusterName: "sourceCluster", + TaskInfo: &taskInfo, + }) + }, + expectedError: nil, + }, + { + name: "GetTimerIndexTasks failure - database error", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() + mockDB.EXPECT(). + SelectTimerTasksOrderByVisibilityTime(ctx, shardID, 10, gomock.Nil(), gomock.Any(), gomock.Any()). + Return(nil, nil, errors.New("database error")) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{ + BatchSize: 10, + MinTimestamp: time.Now().Add(-time.Hour), + MaxTimestamp: time.Now(), + }) + return err + }, + expectedError: &types.InternalServiceError{Message: "database error"}, + }, + { + name: "PutReplicationTaskToDLQ failure - invalid task info", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + // No need to set up a mock call to InsertReplicationDLQTask + // as the operation should not proceed due to validation failure + return newTestNosqlExecutionStore(nosqlplugin.NewMockDB(ctrl), log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + taskInfo := persistence.InternalReplicationTaskInfo{} // Intentionally invalid/incomplete task info + return store.PutReplicationTaskToDLQ(ctx, &persistence.InternalPutReplicationTaskToDLQRequest{ + SourceClusterName: "sourceCluster", + TaskInfo: &taskInfo, + }) + }, + expectedError: &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"}, + }, + + { + name: "GetReplicationTasksFromDLQ success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + + nextPageToken := []byte("next-page-token") + replicationTasks := []*persistence.InternalReplicationTaskInfo{} + mockDB.EXPECT(). + SelectReplicationDLQTasksOrderByTaskID( + ctx, + shardID, + "sourceCluster", + 10, + gomock.Any(), + int64(0), + int64(100), + ).Return(replicationTasks, nextPageToken, nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + initialNextPageToken := []byte{} + _, err := store.GetReplicationTasksFromDLQ(ctx, &persistence.GetReplicationTasksFromDLQRequest{ + SourceClusterName: "sourceCluster", + GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{ + BatchSize: 10, + NextPageToken: initialNextPageToken, + ReadLevel: 0, + MaxReadLevel: 100, + }, + }) + + return err + }, + expectedError: nil, + }, + { + name: "GetReplicationTasksFromDLQ failure - invalid read levels", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + return newTestNosqlExecutionStore(nosqlplugin.NewMockDB(ctrl), log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.GetReplicationTasksFromDLQ(ctx, &persistence.GetReplicationTasksFromDLQRequest{ + SourceClusterName: "sourceCluster", + GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{ + ReadLevel: 100, + MaxReadLevel: 50, + BatchSize: 10, + NextPageToken: []byte{}, + }, + }) + return err + }, + expectedError: &types.BadRequestError{Message: "ReadLevel cannot be higher than MaxReadLevel"}, + }, + { + name: "GetReplicationDLQSize success", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectReplicationDLQTasksCount(ctx, shardID, "sourceCluster"). + Return(int64(42), nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + resp, err := store.GetReplicationDLQSize(ctx, &persistence.GetReplicationDLQSizeRequest{ + SourceClusterName: "sourceCluster", + }) + if err != nil { + return err + } + if resp.Size != 42 { + return errors.New("unexpected DLQ size") + } + return nil + }, + expectedError: nil, + }, + { + name: "GetReplicationDLQSize failure - invalid source cluster name", + setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { + mockDB := nosqlplugin.NewMockDB(ctrl) + mockDB.EXPECT(). + SelectReplicationDLQTasksCount(ctx, shardID, ""). + Return(int64(0), nil) + return newTestNosqlExecutionStore(mockDB, log.NewNoop()) + }, + testFunc: func(store *nosqlExecutionStore) error { + _, err := store.GetReplicationDLQSize(ctx, &persistence.GetReplicationDLQSizeRequest{ + SourceClusterName: "", + }) + return err + }, + expectedError: nil, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -930,3 +1135,21 @@ func newTestNosqlExecutionStore(db nosqlplugin.DB, logger log.Logger) *nosqlExec nosqlStore: nosqlStore{logger: logger, db: db}, } } + +func newInternalReplicationTaskInfo() persistence.InternalReplicationTaskInfo { + var fixedCreationTime = time.Date(2024, time.April, 3, 14, 35, 44, 0, time.UTC) + return persistence.InternalReplicationTaskInfo{ + DomainID: "testDomainID", + WorkflowID: "testWorkflowID", + RunID: "testRunID", + TaskID: 123, + TaskType: persistence.ReplicationTaskTypeHistory, + FirstEventID: 1, + NextEventID: 2, + Version: 1, + ScheduledID: 3, + BranchToken: []byte("branchToken"), + NewRunBranchToken: []byte("newRunBranchToken"), + CreationTime: fixedCreationTime, + } +} From db6ebfaa5c83f7ca7204e6bf2fc1af9f6854eb2b Mon Sep 17 00:00:00 2001 From: Aditi Gautam Date: Thu, 4 Apr 2024 10:47:59 -0700 Subject: [PATCH 2/2] Some unit test is failing --- .../persistence/nosql/nosql_execution_store.go | 3 --- .../nosql/nosql_execution_store_test.go | 17 ----------------- 2 files changed, 20 deletions(-) diff --git a/common/persistence/nosql/nosql_execution_store.go b/common/persistence/nosql/nosql_execution_store.go index aecb4d020f5..96311880768 100644 --- a/common/persistence/nosql/nosql_execution_store.go +++ b/common/persistence/nosql/nosql_execution_store.go @@ -718,9 +718,6 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.InternalPutReplicationTaskToDLQRequest, ) error { - if request.TaskInfo == nil || request.TaskInfo.TaskID == 0 { - return &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"} - } err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo) if err != nil { return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err) diff --git a/common/persistence/nosql/nosql_execution_store_test.go b/common/persistence/nosql/nosql_execution_store_test.go index 0e0a3ffa750..0a87c7ed9c4 100644 --- a/common/persistence/nosql/nosql_execution_store_test.go +++ b/common/persistence/nosql/nosql_execution_store_test.go @@ -954,23 +954,6 @@ func TestNosqlExecutionStore(t *testing.T) { }, expectedError: &types.InternalServiceError{Message: "database error"}, }, - { - name: "PutReplicationTaskToDLQ failure - invalid task info", - setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore { - // No need to set up a mock call to InsertReplicationDLQTask - // as the operation should not proceed due to validation failure - return newTestNosqlExecutionStore(nosqlplugin.NewMockDB(ctrl), log.NewNoop()) - }, - testFunc: func(store *nosqlExecutionStore) error { - taskInfo := persistence.InternalReplicationTaskInfo{} // Intentionally invalid/incomplete task info - return store.PutReplicationTaskToDLQ(ctx, &persistence.InternalPutReplicationTaskToDLQRequest{ - SourceClusterName: "sourceCluster", - TaskInfo: &taskInfo, - }) - }, - expectedError: &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"}, - }, - { name: "GetReplicationTasksFromDLQ success", setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {