Skip to content

Commit

Permalink
Merge branch 'master' into history-client-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng authored Mar 6, 2024
2 parents e49ddbb + 8f5bc04 commit c714e5a
Show file tree
Hide file tree
Showing 9 changed files with 1,869 additions and 709 deletions.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ignore:
- "bench/**"
- "canary/**"
- "common/persistence/persistence-tests/**"
- "common/domain/errors.go"
- "common/log/**"
- "common/metrics/**"
- "common/persistence/nosql/nosqlplugin/dynamodb/**"
Expand Down
117 changes: 111 additions & 6 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package domain

import (
"bytes"
"context"
"encoding/binary"
"errors"
"testing"

Expand All @@ -32,6 +34,10 @@ import (
"github.com/uber/cadence/common/types"
)

const (
preambleVersion0 byte = 0x59
)

func TestReplicationQueueImpl_Publish(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -69,7 +75,6 @@ func TestReplicationQueueImpl_Publish(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand Down Expand Up @@ -111,7 +116,6 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand All @@ -122,15 +126,13 @@ func TestGetReplicationMessages(t *testing.T) {
name string
lastID int64
maxCount int
task *types.ReplicationTask
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful message retrieval",
lastID: 100,
maxCount: 10,
task: &types.ReplicationTask{},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil)
Expand Down Expand Up @@ -160,7 +162,6 @@ func TestGetReplicationMessages(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand Down Expand Up @@ -206,7 +207,111 @@ func TestUpdateAckLevel(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}

func TestReplicationQueueImpl_GetAckLevels(t *testing.T) {
tests := []struct {
name string
want map[string]int64
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful ack levels retrieval",
want: map[string]int64{"testCluster": 100},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"testCluster": 100}, nil)
},
},
{
name: "ack levels retrieval fails",
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(nil, errors.New("retrieval error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
tt.setupMock(mockQueue)
got, err := rq.GetAckLevels(context.Background())
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}

func mockEncodeReplicationTask(sourceTaskID int64) ([]byte, error) {
var buf bytes.Buffer
buf.WriteByte(preambleVersion0)
binary.Write(&buf, binary.BigEndian, sourceTaskID)
return buf.Bytes(), nil
}

func TestGetMessagesFromDLQ(t *testing.T) {
tests := []struct {
name string
firstID int64
lastID int64
pageSize int
pageToken []byte
taskID int64
wantErr bool
}{
{
name: "successful message retrieval",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
taskID: 12345,
wantErr: false,
},
{
name: "read messages fails",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)

if !tt.wantErr {
encodedData, _ := mockEncodeReplicationTask(tt.taskID)
messages := []*persistence.QueueMessage{
{ID: 1, Payload: encodedData},
}
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(messages, []byte("nextToken"), nil)
} else {
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(nil, nil, errors.New("read error"))
}

replicationTasks, token, err := rq.GetMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Len(t, replicationTasks, 1, "Expected one replication task to be returned")
assert.Equal(t, []byte("nextToken"), token, "Expected token to match 'nextToken'")
}
})
}
}
111 changes: 105 additions & 6 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ func TestNosqlExecutionStore(t *testing.T) {
SelectWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, &types.EntityNotExistsError{}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
Expand Down Expand Up @@ -224,16 +221,118 @@ func TestNosqlExecutionStore(t *testing.T) {
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.New("database is unavailable")).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
return store.UpdateWorkflowExecution(ctx, newUpdateWorkflowExecutionRequest())
},
expectedError: &types.InternalServiceError{Message: "database is unavailable"},
},
{
name: "DeleteWorkflowExecution success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
DeleteWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
return store.DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
})
},
expectedError: nil,
},
{
name: "DeleteWorkflowExecution failure - workflow does not exist",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
DeleteWorkflowExecution(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(&types.EntityNotExistsError{Message: "workflow does not exist"})
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
return store.DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
})
},
expectedError: &types.EntityNotExistsError{Message: "workflow does not exist"},
},
{
name: "DeleteCurrentWorkflowExecution success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
DeleteCurrentWorkflow(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
return store.DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
})
},
expectedError: nil,
},
{
name: "DeleteCurrentWorkflowExecution failure - current workflow does not exist",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
DeleteCurrentWorkflow(ctx, shardID, gomock.Any(), gomock.Any(), gomock.Any()).
Return(&types.EntityNotExistsError{Message: "current workflow does not exist"})
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
return store.DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
DomainID: "domainID",
WorkflowID: "workflowID",
RunID: "runID",
})
},
expectedError: &types.EntityNotExistsError{Message: "current workflow does not exist"},
},
{
name: "ListCurrentExecutions success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectAllCurrentWorkflows(ctx, shardID, gomock.Any(), gomock.Any()).
Return([]*persistence.CurrentWorkflowExecution{}, nil, nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.ListCurrentExecutions(ctx, &persistence.ListCurrentExecutionsRequest{})
return err
},
expectedError: nil,
},
{
name: "ListCurrentExecutions failure - database error",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
SelectAllCurrentWorkflows(ctx, shardID, gomock.Any(), gomock.Any()).
Return(nil, nil, errors.New("database error"))
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.ListCurrentExecutions(ctx, &persistence.ListCurrentExecutionsRequest{})
return err
},
expectedError: &types.InternalServiceError{Message: "database error"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit c714e5a

Please sign in to comment.