Skip to content

Commit

Permalink
Merge branch 'master' into minor
Browse files Browse the repository at this point in the history
  • Loading branch information
Groxx authored Mar 7, 2024
2 parents 29f952e + 3da6ce9 commit 721c4e2
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
134 changes: 134 additions & 0 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,137 @@ func TestGetDLQAckLevel(t *testing.T) {
})
}
}

func TestRangeDeleteMessagesFromDLQ(t *testing.T) {
tests := []struct {
name string
firstID int64
lastID int64
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful range delete from DLQ",
firstID: 10,
lastID: 20,
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Eq(int64(10)), gomock.Eq(int64(20))).Return(nil)
},
},
{
name: "range delete from DLQ fails",
firstID: 10,
lastID: 20,
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().RangeDeleteMessagesFromDLQ(gomock.Any(), gomock.Eq(int64(10)), gomock.Eq(int64(20))).Return(errors.New("range delete error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
tt.setupMock(mockQueue)
err := rq.RangeDeleteMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestDeleteMessageFromDLQ(t *testing.T) {
tests := []struct {
name string
messageID int64
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful delete from DLQ",
messageID: 15,
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Eq(int64(15))).Return(nil)
},
},
{
name: "delete from DLQ fails",
messageID: 15,
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().DeleteMessageFromDLQ(gomock.Any(), gomock.Eq(int64(15))).Return(errors.New("delete error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
tt.setupMock(mockQueue)
err := rq.DeleteMessageFromDLQ(context.Background(), tt.messageID)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestGetDLQSize(t *testing.T) {
tests := []struct {
name string
wantSize int64
wantErr bool
setupMock func(m *persistence.MockQueueManager)
}{
{
name: "returns correct size for non-empty DLQ",
wantSize: 10,
wantErr: false,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(10), nil)
},
},
{
name: "returns zero for empty DLQ",
wantSize: 0,
wantErr: false,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(0), nil)
},
},
{
name: "propagates error from underlying queue",
wantErr: true,
setupMock: func(m *persistence.MockQueueManager) {
m.EXPECT().GetDLQSize(gomock.Any()).Return(int64(0), errors.New("database error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueueManager := persistence.NewMockQueueManager(ctrl)
tt.setupMock(mockQueueManager)
q := &replicationQueueImpl{queue: mockQueueManager}
size, err := q.GetDLQSize(context.Background())
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantSize, size)
}
})
}
}
109 changes: 109 additions & 0 deletions common/persistence/data_manager_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,112 @@ func TestOnlyGetTypeTask(t *testing.T) {
}
}
}

func TestTransferTaskInfo(t *testing.T) {
timeNow := time.Now()
task := &TransferTaskInfo{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
TargetDomainIDs: map[string]struct{}{"test-target-domain-id": {}},
TaskType: TransferTaskTypeActivityTask,
TaskID: 1,
ScheduleID: 1,
Version: 1,
VisibilityTimestamp: timeNow,
}
expectedString := fmt.Sprintf("%#v", task)

assert.Equal(t, "test-domain-id", task.GetDomainID())
assert.Equal(t, "test-workflow-id", task.GetWorkflowID())
assert.Equal(t, "test-run-id", task.GetRunID())
assert.Equal(t, TransferTaskTypeActivityTask, task.GetTaskType())
assert.Equal(t, int64(1), task.GetTaskID())
assert.Equal(t, int64(1), task.GetVersion())
assert.Equal(t, timeNow, task.GetVisibilityTimestamp())
assert.Equal(t, map[string]struct{}{"test-target-domain-id": {}}, task.GetTargetDomainIDs())
assert.Equal(t, expectedString, task.String())
}

func TestReplicationTaskInfo(t *testing.T) {
task := &ReplicationTaskInfo{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
TaskType: ReplicationTaskTypeHistory,
TaskID: 1,
Version: 1,
FirstEventID: 1,
NextEventID: 2,
ScheduledID: 3,
}
assert.Equal(t, "test-domain-id", task.GetDomainID())
assert.Equal(t, "test-workflow-id", task.GetWorkflowID())
assert.Equal(t, "test-run-id", task.GetRunID())
assert.Equal(t, ReplicationTaskTypeHistory, task.GetTaskType())
assert.Equal(t, int64(1), task.GetTaskID())
assert.Equal(t, int64(1), task.GetVersion())
assert.Equal(t, time.Time{}, task.GetVisibilityTimestamp())
}

func TestTimeTaskInfo(t *testing.T) {
timeNow := time.Now()
task := &TimerTaskInfo{
VisibilityTimestamp: timeNow,
TaskType: 4,
TaskID: 3,
Version: 2,
RunID: "test-run-id",
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
}
expectedString := fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, VisibilityTimestamp: %v, TaskID: %v, TaskType: %v, TimeoutType: %v, EventID: %v, ScheduleAttempt: %v, Version: %v.}",
task.DomainID, task.WorkflowID, task.RunID, task.VisibilityTimestamp, task.TaskID, task.TaskType, task.TimeoutType, task.EventID, task.ScheduleAttempt, task.Version,
)
assert.Equal(t, 4, task.GetTaskType())
assert.Equal(t, int64(3), task.GetTaskID())
assert.Equal(t, int64(2), task.GetVersion())
assert.Equal(t, timeNow, task.GetVisibilityTimestamp())
assert.Equal(t, "test-run-id", task.GetRunID())
assert.Equal(t, "test-domain-id", task.GetDomainID())
assert.Equal(t, "test-workflow-id", task.GetWorkflowID())
assert.Equal(t, expectedString, task.String())
}

func TestShardInfoCopy(t *testing.T) {
info := &ShardInfo{
ShardID: 1,
RangeID: 2,
ClusterTransferAckLevel: map[string]int64{"test-cluster": 3},
ClusterTimerAckLevel: map[string]time.Time{"test-cluster": time.Now()},
ClusterReplicationLevel: map[string]int64{"test-cluster": 4},
ReplicationDLQAckLevel: map[string]int64{"test-cluster": 5},
}

infoCopy := info.Copy()
assert.Equal(t, info, infoCopy)
}

func TestSerializeAndDeserializeClusterConfigs(t *testing.T) {
configs := []*ClusterReplicationConfig{
{
ClusterName: "test-cluster1",
},
{
ClusterName: "test-cluster2",
},
}
serializedResult := SerializeClusterConfigs(configs)
deserializedResult := DeserializeClusterConfigs(serializedResult)

assert.Equal(t, configs, deserializedResult)

}

func TestTimeStampConvertion(t *testing.T) {
timeNow := time.Now()
milisSecond := UnixNanoToDBTimestamp(timeNow.UnixNano())
unixNanoTime := DBTimestampToUnixNano(milisSecond)
assert.Equal(t, timeNow.UnixNano()/(1000*1000), unixNanoTime/(1000*1000)) // unixNano to milisSecond will result in info loss
}

0 comments on commit 721c4e2

Please sign in to comment.