Skip to content

Commit

Permalink
Merge branch 'master' into dataManagerInterfaces-test-4
Browse files Browse the repository at this point in the history
  • Loading branch information
timl3136 authored Mar 6, 2024
2 parents a021757 + 2d2ece0 commit 587a302
Show file tree
Hide file tree
Showing 11 changed files with 2,098 additions and 709 deletions.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ignore:
- "bench/**"
- "canary/**"
- "common/persistence/persistence-tests/**"
- "common/domain/errors.go"
- "common/log/**"
- "common/metrics/**"
- "common/persistence/nosql/nosqlplugin/dynamodb/**"
Expand Down
117 changes: 111 additions & 6 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package domain

import (
"bytes"
"context"
"encoding/binary"
"errors"
"testing"

Expand All @@ -32,6 +34,10 @@ import (
"github.com/uber/cadence/common/types"
)

const (
preambleVersion0 byte = 0x59
)

func TestReplicationQueueImpl_Publish(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -69,7 +75,6 @@ func TestReplicationQueueImpl_Publish(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand Down Expand Up @@ -111,7 +116,6 @@ func TestReplicationQueueImpl_PublishToDLQ(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand All @@ -122,15 +126,13 @@ func TestGetReplicationMessages(t *testing.T) {
name string
lastID int64
maxCount int
task *types.ReplicationTask
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful message retrieval",
lastID: 100,
maxCount: 10,
task: &types.ReplicationTask{},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().ReadMessages(gomock.Any(), gomock.Eq(int64(100)), gomock.Eq(10)).Return(persistence.QueueMessageList{}, nil)
Expand Down Expand Up @@ -160,7 +162,6 @@ func TestGetReplicationMessages(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}
Expand Down Expand Up @@ -206,7 +207,111 @@ func TestUpdateAckLevel(t *testing.T) {
} else {
assert.NoError(t, err)
}
ctrl.Finish()
})
}
}

func TestReplicationQueueImpl_GetAckLevels(t *testing.T) {
tests := []struct {
name string
want map[string]int64
wantErr bool
setupMock func(q *persistence.MockQueueManager)
}{
{
name: "successful ack levels retrieval",
want: map[string]int64{"testCluster": 100},
wantErr: false,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{"testCluster": 100}, nil)
},
},
{
name: "ack levels retrieval fails",
wantErr: true,
setupMock: func(q *persistence.MockQueueManager) {
q.EXPECT().GetAckLevels(gomock.Any()).Return(nil, errors.New("retrieval error"))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)
tt.setupMock(mockQueue)
got, err := rq.GetAckLevels(context.Background())
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}

func mockEncodeReplicationTask(sourceTaskID int64) ([]byte, error) {
var buf bytes.Buffer
buf.WriteByte(preambleVersion0)
binary.Write(&buf, binary.BigEndian, sourceTaskID)
return buf.Bytes(), nil
}

func TestGetMessagesFromDLQ(t *testing.T) {
tests := []struct {
name string
firstID int64
lastID int64
pageSize int
pageToken []byte
taskID int64
wantErr bool
}{
{
name: "successful message retrieval",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
taskID: 12345,
wantErr: false,
},
{
name: "read messages fails",
firstID: 100,
lastID: 200,
pageSize: 10,
pageToken: []byte("token"),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil)

if !tt.wantErr {
encodedData, _ := mockEncodeReplicationTask(tt.taskID)
messages := []*persistence.QueueMessage{
{ID: 1, Payload: encodedData},
}
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(messages, []byte("nextToken"), nil)
} else {
mockQueue.EXPECT().ReadMessagesFromDLQ(gomock.Any(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken).Return(nil, nil, errors.New("read error"))
}

replicationTasks, token, err := rq.GetMessagesFromDLQ(context.Background(), tt.firstID, tt.lastID, tt.pageSize, tt.pageToken)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Len(t, replicationTasks, 1, "Expected one replication task to be returned")
assert.Equal(t, []byte("nextToken"), token, "Expected token to match 'nextToken'")
}
})
}
}
130 changes: 130 additions & 0 deletions common/elasticsearch/client/v6/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,136 @@ func TestSearch(t *testing.T) {
}
}

func TestScroll(t *testing.T) {
testCases := []struct {
name string
query string
expected map[string]interface{}
expectErr bool
index string
handler http.HandlerFunc
scrollID string
}{
{
name: "normal case",
query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`,
expected: map[string]interface{}{
"WorkflowID": "test-workflow-id",
},
index: "testIndex",
handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/testIndex/_search" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"took": 5,
"timed_out": false,
"hits": {
"total": 1,
"hits": [{
"_source": {
"WorkflowID": "test-workflow-id"
}
}]
},
"aggregations": {
"sample_agg": {
"value": 42
}
}
}`))
} else {
w.WriteHeader(http.StatusNotFound)
}
}),
expectErr: false,
},
{
name: "error case",
query: `{"query":{"bool":{"must":{"match":{"WorkflowID":"test-workflow-id"}}}}}`,
index: "testIndex",
handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}),
expectErr: true,
scrollID: "1",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
elasticV6, testServer := getMockClient(t, tc.handler)
defer testServer.Close()
resp, err := elasticV6.Scroll(context.Background(), tc.index, tc.query, tc.scrollID)
if !tc.expectErr {
assert.NoError(t, err)
var actualSource map[string]interface{}
err := json.Unmarshal(resp.Hits.Hits[0].Source, &actualSource)
assert.NoError(t, err)
assert.Equal(t, tc.expected, actualSource)
} else {
assert.Error(t, err)
assert.Nil(t, resp)
}
})
}
}

func TestClearScroll(t *testing.T) {
var handlerCalled bool
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handlerCalled = true
if r.Method == "DELETE" && r.URL.Path == "/_search/scroll" {
// Simulate a successful clear scroll response
w.WriteHeader(http.StatusOK)
response := `{
"succeeded": true,
"num_freed": 1
}`
w.Write([]byte(response))
} else {
w.WriteHeader(http.StatusNotFound)
}
})
elasticV6, testServer := getMockClient(t, handler)
defer testServer.Close()
err := elasticV6.ClearScroll(context.Background(), "scrollID")
assert.True(t, handlerCalled, "Expected handler to be called")
assert.NoError(t, err)
}

func TestIsNotFoundError(t *testing.T) {
testCases := []struct {
name string
handler http.HandlerFunc
expected bool
}{
{
name: "NotFound error",
handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}),
expected: true,
},
{
name: "Other error",
handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Bad Request", http.StatusBadRequest)
}),
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
elasticV6, testServer := getMockClient(t, tc.handler)
defer testServer.Close()
err := elasticV6.CreateIndex(context.Background(), "testIndex")
res := elasticV6.IsNotFoundError(err)
assert.Equal(t, tc.expected, res)
})
}
}

func getMockClient(t *testing.T, handler http.HandlerFunc) (ElasticV6, *httptest.Server) {
testServer := httptest.NewTLSServer(handler)
mockClient, err := elastic.NewClient(
Expand Down
Loading

0 comments on commit 587a302

Please sign in to comment.