Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added some validations in the nosql store and new tests #5853

Merged
merged 8 commits into from
Apr 4, 2024
7 changes: 6 additions & 1 deletion common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
ctx context.Context,
request *persistence.InternalPutReplicationTaskToDLQRequest,
) error {

if request.TaskInfo == nil || request.TaskInfo.TaskID == 0 {
return &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"}
}
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
if err != nil {
return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
Expand All @@ -731,6 +733,9 @@ func (d *nosqlExecutionStore) GetReplicationTasksFromDLQ(
ctx context.Context,
request *persistence.GetReplicationTasksFromDLQRequest,
) (*persistence.InternalGetReplicationTasksFromDLQResponse, error) {
if request.ReadLevel > request.MaxReadLevel {
return nil, &types.BadRequestError{Message: "ReadLevel cannot be higher than MaxReadLevel"}
}
tasks, nextPageToken, err := d.db.SelectReplicationDLQTasksOrderByTaskID(ctx, d.shardID, request.SourceClusterName, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
if err != nil {
return nil, convertCommonErrors(d.db, "GetReplicationTasksFromDLQ", err)
Expand Down
223 changes: 223 additions & 0 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,211 @@ func TestNosqlExecutionStore(t *testing.T) {
},
expectedError: nil,
},
{
name: "GetTimerIndexTasks success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectTimerTasksOrderByVisibilityTime(
ctx,
shardID,
10,
gomock.Nil(),
gomock.Any(),
gomock.Any(),
).Return([]*persistence.TimerTaskInfo{}, nil, nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{
BatchSize: 10,
MinTimestamp: time.Now().Add(-time.Hour),
MaxTimestamp: time.Now(),
})
return err
},
expectedError: nil,
},
{
name: "GetTimerIndexTasks success - empty result",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectTimerTasksOrderByVisibilityTime(ctx, shardID, 10, gomock.Nil(), gomock.Any(), gomock.Any()).
Return([]*persistence.TimerTaskInfo{}, []byte{}, nil) // Return an empty list
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
resp, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{
BatchSize: 10,
MinTimestamp: time.Now().Add(-time.Hour),
MaxTimestamp: time.Now(),
})
if err != nil {
return err
}
if len(resp.Timers) != 0 {
return errors.New("expected empty result set for timers")
}
return nil
},
expectedError: nil,
},
{
name: "PutReplicationTaskToDLQ success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
replicationTaskInfo := newInternalReplicationTaskInfo()

mockDB.EXPECT().
InsertReplicationDLQTask(ctx, shardID, "sourceCluster", gomock.Any()).
DoAndReturn(func(_ context.Context, _ int, _ string, taskInfo persistence.InternalReplicationTaskInfo) error {
require.Equal(t, replicationTaskInfo, taskInfo)
return nil
})

return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
taskInfo := newInternalReplicationTaskInfo()
return store.PutReplicationTaskToDLQ(ctx, &persistence.InternalPutReplicationTaskToDLQRequest{
SourceClusterName: "sourceCluster",
TaskInfo: &taskInfo,
})
},
expectedError: nil,
},
{
name: "GetTimerIndexTasks failure - database error",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().
SelectTimerTasksOrderByVisibilityTime(ctx, shardID, 10, gomock.Nil(), gomock.Any(), gomock.Any()).
Return(nil, nil, errors.New("database error"))
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{
BatchSize: 10,
MinTimestamp: time.Now().Add(-time.Hour),
MaxTimestamp: time.Now(),
})
return err
},
expectedError: &types.InternalServiceError{Message: "database error"},
},
{
name: "PutReplicationTaskToDLQ failure - invalid task info",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
// No need to set up a mock call to InsertReplicationDLQTask
// as the operation should not proceed due to validation failure
return newTestNosqlExecutionStore(nosqlplugin.NewMockDB(ctrl), log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
taskInfo := persistence.InternalReplicationTaskInfo{} // Intentionally invalid/incomplete task info
return store.PutReplicationTaskToDLQ(ctx, &persistence.InternalPutReplicationTaskToDLQRequest{
SourceClusterName: "sourceCluster",
TaskInfo: &taskInfo,
})
},
expectedError: &types.BadRequestError{Message: "Invalid replication task info: TaskID is required"},
},

{
name: "GetReplicationTasksFromDLQ success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)

nextPageToken := []byte("next-page-token")
replicationTasks := []*persistence.InternalReplicationTaskInfo{}
mockDB.EXPECT().
SelectReplicationDLQTasksOrderByTaskID(
ctx,
shardID,
"sourceCluster",
10,
gomock.Any(),
int64(0),
int64(100),
).Return(replicationTasks, nextPageToken, nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
initialNextPageToken := []byte{}
_, err := store.GetReplicationTasksFromDLQ(ctx, &persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: "sourceCluster",
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
BatchSize: 10,
NextPageToken: initialNextPageToken,
ReadLevel: 0,
MaxReadLevel: 100,
},
})

return err
},
expectedError: nil,
},
{
name: "GetReplicationTasksFromDLQ failure - invalid read levels",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
return newTestNosqlExecutionStore(nosqlplugin.NewMockDB(ctrl), log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.GetReplicationTasksFromDLQ(ctx, &persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: "sourceCluster",
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: 100,
MaxReadLevel: 50,
BatchSize: 10,
NextPageToken: []byte{},
},
})
return err
},
expectedError: &types.BadRequestError{Message: "ReadLevel cannot be higher than MaxReadLevel"},
},
{
name: "GetReplicationDLQSize success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectReplicationDLQTasksCount(ctx, shardID, "sourceCluster").
Return(int64(42), nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
resp, err := store.GetReplicationDLQSize(ctx, &persistence.GetReplicationDLQSizeRequest{
agautam478 marked this conversation as resolved.
Show resolved Hide resolved
SourceClusterName: "sourceCluster",
})
if err != nil {
return err
}
if resp.Size != 42 {
return errors.New("unexpected DLQ size")
}
return nil
},
expectedError: nil,
},
{
name: "GetReplicationDLQSize failure - invalid source cluster name",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectReplicationDLQTasksCount(ctx, shardID, "").
Return(int64(0), nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.GetReplicationDLQSize(ctx, &persistence.GetReplicationDLQSizeRequest{
SourceClusterName: "",
})
return err
},
expectedError: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -930,3 +1135,21 @@ func newTestNosqlExecutionStore(db nosqlplugin.DB, logger log.Logger) *nosqlExec
nosqlStore: nosqlStore{logger: logger, db: db},
}
}

func newInternalReplicationTaskInfo() persistence.InternalReplicationTaskInfo {
var fixedCreationTime = time.Date(2024, time.April, 3, 14, 35, 44, 0, time.UTC)
return persistence.InternalReplicationTaskInfo{
DomainID: "testDomainID",
WorkflowID: "testWorkflowID",
RunID: "testRunID",
TaskID: 123,
TaskType: persistence.ReplicationTaskTypeHistory,
FirstEventID: 1,
NextEventID: 2,
Version: 1,
ScheduledID: 3,
BranchToken: []byte("branchToken"),
NewRunBranchToken: []byte("newRunBranchToken"),
CreationTime: fixedCreationTime,
}
}
Loading